From a8da15f4a5d42ec754366f2cb4a5de8a6ecbd513 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 12:31:26 -0700 Subject: [PATCH 01/18] Add requests-toolbelt dep --- python/poetry.lock | 19 +++++++++++++++++-- python/pyproject.toml | 1 + 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/python/poetry.lock b/python/poetry.lock index 861e0f392..4814514b9 100644 --- a/python/poetry.lock +++ b/python/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "annotated-types" @@ -1374,6 +1374,20 @@ urllib3 = ">=1.21.1,<3" socks = ["PySocks (>=1.5.6,!=1.5.7)"] use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"] +[[package]] +name = "requests-toolbelt" +version = "1.0.0" +description = "A utility belt for advanced users of python-requests" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +files = [ + {file = "requests-toolbelt-1.0.0.tar.gz", hash = "sha256:7681a0a3d047012b5bdc0ee37d7f8f07ebe76ab08caeccfc3921ce23c88d5bc6"}, + {file = "requests_toolbelt-1.0.0-py2.py3-none-any.whl", hash = "sha256:cccfdd665f0a24fcf4726e690f65639d272bb0637b9b92dfd91a5568ccf6bd06"}, +] + +[package.dependencies] +requests = ">=2.0.1,<3.0.0" + [[package]] name = "ruff" version = "0.3.7" @@ -1639,6 +1653,7 @@ description = "Automatically mock your HTTP interactions to simplify and speed u optional = false python-versions = ">=3.8" files = [ + {file = "vcrpy-6.0.1-py2.py3-none-any.whl", hash = "sha256:621c3fb2d6bd8aa9f87532c688e4575bcbbde0c0afeb5ebdb7e14cac409edfdd"}, {file = "vcrpy-6.0.1.tar.gz", hash = "sha256:9e023fee7f892baa0bbda2f7da7c8ac51165c1c6e38ff8688683a12a4bde9278"}, ] @@ -1888,4 +1903,4 @@ vcr = [] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "efc464f40b1618531c35a40a249abccadcbd52c081f8f36ea06a6abd796ecfd9" +content-hash = "244ff8da405f7ae894fcee75f851ef3b51eb556e5ac56a84fc3eb4930851e75c" diff --git a/python/pyproject.toml b/python/pyproject.toml index 0a47ed049..8791fbdff 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -33,6 +33,7 @@ pydantic = [ requests = "^2" orjson = "^3.9.14" httpx = ">=0.23.0,<1" +requests-toolbelt = "^1.0.0" [tool.poetry.group.dev.dependencies] pytest = "^7.3.1" From 87b5d4efae7d57320bf4762233a452b8bc20c70a Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 12:34:22 -0700 Subject: [PATCH 02/18] Use custom http adapter - subclass so that we can override blocksize (default one too small to maximize performance on modern network stacks) - override default pool size to match the max nr of batch tarcing threads (otherwise some threads would be left waiting for connections to be available) - actually mount the adapter (previous behavior was never mounting the adapter because sessions always have http and https adapters) --- python/langsmith/client.py | 46 +++++++++++++++++++++++++++++++------- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index fcef0b4b6..c7d8826e8 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -435,6 +435,32 @@ class TracingQueueItem: item: Any = field(compare=False) +class _LangSmithHttpAdapter(requests_adapters.HTTPAdapter): + __attrs__ = [ + "max_retries", + "config", + "_pool_connections", + "_pool_maxsize", + "_pool_block", + "_blocksize", + ] + + def __init__( + self, + pool_connections: int = requests_adapters.DEFAULT_POOLSIZE, + pool_maxsize: int = requests_adapters.DEFAULT_POOLSIZE, + max_retries: Union[Retry, int, None] = requests_adapters.DEFAULT_RETRIES, + pool_block: bool = requests_adapters.DEFAULT_POOLBLOCK, + blocksize: int = 16384, # default from urllib3.BaseHTTPSConnection + ) -> None: + self._blocksize = blocksize + super().__init__(pool_connections, pool_maxsize, max_retries, pool_block) + + def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs): + pool_kwargs["blocksize"] = self._blocksize + return super().init_poolmanager(connections, maxsize, block, **pool_kwargs) + + class Client: """Client for interacting with the LangSmith API.""" @@ -578,12 +604,16 @@ def __init__( self.tracing_queue = None # Mount the HTTPAdapter with the retry configuration. - adapter = requests_adapters.HTTPAdapter(max_retries=self.retry_config) - # Don't overwrite if session already has an adapter - if not self.session.get_adapter("http://"): - self.session.mount("http://", adapter) - if not self.session.get_adapter("https://"): - self.session.mount("https://", adapter) + adapter = _LangSmithHttpAdapter( + max_retries=self.retry_config, + blocksize=_BLOCKSIZE_BYTES, + # We need to set the pool_maxsize to a value greater than the + # number of threads used for batch tracing, plus 1 for other + # requests. + pool_maxsize=_AUTO_SCALE_UP_NTHREADS_LIMIT + 1, + ) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) self._get_data_type_cached = functools.lru_cache(maxsize=10)( self._get_data_type ) @@ -5684,7 +5714,7 @@ def convert_prompt_to_openai_format( ls_utils.LangSmithError: If there is an error during the conversion process. """ try: - from langchain_openai import ChatOpenAI + from langchain_openai import ChatOpenAI # type: ignore except ImportError: raise ImportError( "The convert_prompt_to_openai_format function requires the langchain_openai" @@ -5720,7 +5750,7 @@ def convert_prompt_to_anthropic_format( dict: The prompt in Anthropic format. """ try: - from langchain_anthropic import ChatAnthropic + from langchain_anthropic import ChatAnthropic # type: ignore except ImportError: raise ImportError( "The convert_prompt_to_anthropic_format function requires the " From d60ec1ff9925c24f85dcd70a918d08394133bbe9 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 12:36:57 -0700 Subject: [PATCH 03/18] Add attachments field to run dict/model - update _run_transform to collect/drop attachments - warn if attachments are passed to an endpoint that can't accept them, in this case inject runs without attachments --- python/langsmith/client.py | 62 +++++++++++++++++++++++++++---------- python/langsmith/schemas.py | 10 ++++++ 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index c7d8826e8..10097ee3a 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -90,6 +90,8 @@ class ZoneInfo: # type: ignore[no-redef] _urllib3_logger = logging.getLogger("urllib3.connectionpool") X_API_KEY = "x-api-key" +WARNED_ATTACHMENTS = False +EMPTY_SEQ: tuple[Dict, ...] = () def _parse_token_or_url( @@ -811,7 +813,7 @@ def request_with_retries( ls_utils.FilterPoolFullWarning(host=str(self._host)), ] retry_on_: Tuple[Type[BaseException], ...] = ( - *(retry_on or []), + *(retry_on or ()), *( ls_utils.LangSmithConnectionError, ls_utils.LangSmithAPIError, @@ -1188,17 +1190,23 @@ def _run_transform( run: Union[ls_schemas.Run, dict, ls_schemas.RunLikeDict], update: bool = False, copy: bool = False, + attachments_collector: Optional[Dict[str, ls_schemas.Attachments]] = None, ) -> dict: """Transform the given run object into a dictionary representation. Args: run (Union[ls_schemas.Run, dict]): The run object to transform. - update (bool, optional): Whether to update the run. Defaults to False. - copy (bool, optional): Whether to copy the run. Defaults to False. + update (bool, optional): Whether the payload is for an "update" event. + copy (bool, optional): Whether to deepcopy run inputs/outputs. + attachments_collector (Optional[dict[str, ls_schemas.Attachments]]): + A dictionary to collect attachments. If not passed, attachments + will be dropped. Returns: dict: The transformed run object as a dictionary. """ + global WARNED_ATTACHMENTS + if hasattr(run, "dict") and callable(getattr(run, "dict")): run_create: dict = run.dict() # type: ignore else: @@ -1225,13 +1233,22 @@ def _run_transform( "prompt", ): # Drop completely - run_create = {k: v for k, v in run_create.items() if k != "serialized"} + run_create.pop("serialized", None) else: # Drop graph - serialized = { - k: v for k, v in run_create["serialized"].items() if k != "graph" - } - run_create = {**run_create, "serialized": serialized} + run_create["serialized"].pop("graph", None) + + # Collect or drop attachments + if attachments := run_create.get("attachments", None): + if attachments_collector is not None: + attachments_collector[run_create["id"]] = attachments + elif not WARNED_ATTACHMENTS: + WARNED_ATTACHMENTS = True + logger.warning( + "You're trying to submit a run with attachments, but your current" + " LangSmith integration doesn't support it. Please contact the " + " LangChain team for assitance on how to upgrade." + ) return run_create @@ -1410,8 +1427,10 @@ def batch_ingest_runs( if not create and not update: return # transform and convert to dicts - create_dicts = [self._run_transform(run) for run in create or []] - update_dicts = [self._run_transform(run, update=True) for run in update or []] + create_dicts = [self._run_transform(run) for run in create or EMPTY_SEQ] + update_dicts = [ + self._run_transform(run, update=True) for run in update or EMPTY_SEQ + ] # combine post and patch dicts where possible if update_dicts and create_dicts: create_by_id = {run["id"]: run for run in create_dicts} @@ -1453,8 +1472,7 @@ def batch_ingest_runs( size_limit_bytes = (info.batch_ingest_config or {}).get( "size_limit_bytes" - # 20 MB max by default - ) or 20_971_520 + ) or _SIZE_LIMIT_BYTES # Get orjson fragments to avoid going over the max request size partial_body = { "post": [_dumps_json(run) for run in raw_body["post"]], @@ -5570,6 +5588,7 @@ def _tracing_thread_handle_batch( client: Client, tracing_queue: Queue, batch: List[TracingQueueItem], + use_multipart: bool, ) -> None: create = [it.item for it in batch if it.action == "create"] update = [it.item for it in batch if it.action == "update"] @@ -5585,15 +5604,18 @@ def _tracing_thread_handle_batch( tracing_queue.task_done() +_SIZE_LIMIT_BYTES = 20_971_520 # 20MB by default _AUTO_SCALE_UP_QSIZE_TRIGGER = 1000 _AUTO_SCALE_UP_NTHREADS_LIMIT = 16 _AUTO_SCALE_DOWN_NEMPTY_TRIGGER = 4 +_BLOCKSIZE_BYTES = 1024 * 1024 # 1MB def _ensure_ingest_config( info: ls_schemas.LangSmithInfo, ) -> ls_schemas.BatchIngestConfig: default_config = ls_schemas.BatchIngestConfig( + use_multipart_endpoint=False, size_limit_bytes=None, # Note this field is not used here size_limit=100, scale_up_nthreads_limit=_AUTO_SCALE_UP_NTHREADS_LIMIT, @@ -5620,6 +5642,7 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: size_limit: int = batch_ingest_config["size_limit"] scale_up_nthreads_limit: int = batch_ingest_config["scale_up_nthreads_limit"] scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] + use_multipart: bool = batch_ingest_config["use_multipart_endpoint"] sub_threads: List[threading.Thread] = [] # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached @@ -5641,21 +5664,24 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: ): new_thread = threading.Thread( target=_tracing_sub_thread_func, - args=(weakref.ref(client),), + args=(weakref.ref(client), use_multipart), ) sub_threads.append(new_thread) new_thread.start() if next_batch := _tracing_thread_drain_queue(tracing_queue, limit=size_limit): - _tracing_thread_handle_batch(client, tracing_queue, next_batch) + _tracing_thread_handle_batch( + client, tracing_queue, next_batch, use_multipart + ) # drain the queue on exit while next_batch := _tracing_thread_drain_queue( tracing_queue, limit=size_limit, block=False ): - _tracing_thread_handle_batch(client, tracing_queue, next_batch) + _tracing_thread_handle_batch(client, tracing_queue, next_batch, use_multipart) def _tracing_sub_thread_func( client_ref: weakref.ref[Client], + use_multipart: bool, ) -> None: client = client_ref() if client is None: @@ -5682,7 +5708,9 @@ def _tracing_sub_thread_func( ): if next_batch := _tracing_thread_drain_queue(tracing_queue, limit=size_limit): seen_successive_empty_queues = 0 - _tracing_thread_handle_batch(client, tracing_queue, next_batch) + _tracing_thread_handle_batch( + client, tracing_queue, next_batch, use_multipart + ) else: seen_successive_empty_queues += 1 @@ -5690,7 +5718,7 @@ def _tracing_sub_thread_func( while next_batch := _tracing_thread_drain_queue( tracing_queue, limit=size_limit, block=False ): - _tracing_thread_handle_batch(client, tracing_queue, next_batch) + _tracing_thread_handle_batch(client, tracing_queue, next_batch, use_multipart) def convert_prompt_to_openai_format( diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index 33bb11c40..3a69e4d6f 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -12,6 +12,7 @@ List, Optional, Protocol, + Tuple, Union, runtime_checkable, ) @@ -43,6 +44,9 @@ SCORE_TYPE = Union[StrictBool, StrictInt, StrictFloat, None] VALUE_TYPE = Union[Dict, str, None] +Attachments = Dict[str, Tuple[str, bytes]] +"""Attachments associated with the run. Each entry is a tuple of (mime_type, bytes).""" + class ExampleBase(BaseModel): """Example base model.""" @@ -318,6 +322,9 @@ class Run(RunBase): """ # noqa: E501 in_dataset: Optional[bool] = None """Whether this run is in a dataset.""" + attachments: Attachments = Field(default_factory=dict) + """Attachments associated with the run. + Each entry is a tuple of (mime_type, bytes).""" _host_url: Optional[str] = PrivateAttr(default=None) def __init__(self, _host_url: Optional[str] = None, **kwargs: Any) -> None: @@ -376,6 +383,7 @@ class RunLikeDict(TypedDict, total=False): output_attachments: Optional[dict] trace_id: UUID dotted_order: str + attachments: Attachments class RunWithAnnotationQueueInfo(RunBase): @@ -637,6 +645,8 @@ class AnnotationQueue(BaseModel): class BatchIngestConfig(TypedDict, total=False): """Configuration for batch ingestion.""" + use_multipart_endpoint: bool + """Whether to use the multipart endpoint for batch ingestion.""" scale_up_qsize_trigger: int """The queue size threshold that triggers scaling up.""" scale_up_nthreads_limit: int From a20c94cbeaf82e874aeba46136a9793f9041452c Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 13:39:47 -0700 Subject: [PATCH 04/18] Fix up --- python/langsmith/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 10097ee3a..62331996f 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -5642,7 +5642,7 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: size_limit: int = batch_ingest_config["size_limit"] scale_up_nthreads_limit: int = batch_ingest_config["scale_up_nthreads_limit"] scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] - use_multipart: bool = batch_ingest_config["use_multipart_endpoint"] + use_multipart: bool = batch_ingest_config.get("use_multipart_endpoint", False) sub_threads: List[threading.Thread] = [] # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached From f345d2c585ad10169234f64785d2d203030ed453 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 13:33:44 -0700 Subject: [PATCH 05/18] Implement "streaming" multipart requests to multipart endpoint - Use streaming multipart encoder form requests_toolbelt - Currently dump each part to json before sending the request as that's the only way to enforce the payload size limit - When we lift payload size limit we should implement true streaming encoding, where each part is only encoded immediately before being sent over the connection, and use transfer-encoding: chunked --- python/langsmith/client.py | 169 ++++++++++++++++++++++++++++++++++++- 1 file changed, 168 insertions(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 62331996f..5c0f8228e 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -63,6 +63,7 @@ import orjson import requests from requests import adapters as requests_adapters +from requests_toolbelt.multipart import MultipartEncoder from typing_extensions import TypeGuard from urllib3.util import Retry @@ -92,6 +93,8 @@ class ZoneInfo: # type: ignore[no-redef] X_API_KEY = "x-api-key" WARNED_ATTACHMENTS = False EMPTY_SEQ: tuple[Dict, ...] = () +BOUNDARY = uuid.uuid4().hex +MultipartParts = List[Tuple[str, Tuple[None, bytes, str]]] def _parse_token_or_url( @@ -1538,6 +1541,167 @@ def _post_batch_ingest_runs(self, body: bytes, *, _context: str): except Exception: logger.warning(f"Failed to batch ingest runs: {repr(e)}") + def multipart_ingest_runs( + self, + create: Optional[ + Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict, Dict]] + ] = None, + update: Optional[ + Sequence[Union[ls_schemas.Run, ls_schemas.RunLikeDict, Dict]] + ] = None, + *, + pre_sampled: bool = False, + ): + """Batch ingest/upsert multiple runs in the Langsmith system. + + Args: + create (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): + A sequence of `Run` objects or equivalent dictionaries representing + runs to be created / posted. + update (Optional[Sequence[Union[ls_schemas.Run, RunLikeDict]]]): + A sequence of `Run` objects or equivalent dictionaries representing + runs that have already been created and should be updated / patched. + pre_sampled (bool, optional): Whether the runs have already been subject + to sampling, and therefore should not be sampled again. + Defaults to False. + + Returns: + None: If both `create` and `update` are None. + + Raises: + LangsmithAPIError: If there is an error in the API request. + + Note: + - The run objects MUST contain the dotted_order and trace_id fields + to be accepted by the API. + """ + if not create and not update: + return + # transform and convert to dicts + all_attachments: Dict[str, ls_schemas.Attachments] = {} + create_dicts = [ + self._run_transform(run, attachments_collector=all_attachments) + for run in create or EMPTY_SEQ + ] + update_dicts = [ + self._run_transform(run, update=True, attachments_collector=all_attachments) + for run in update or EMPTY_SEQ + ] + # combine post and patch dicts where possible + if update_dicts and create_dicts: + create_by_id = {run["id"]: run for run in create_dicts} + standalone_updates: list[dict] = [] + for run in update_dicts: + if run["id"] in create_by_id: + for k, v in run.items(): + if v is not None: + create_by_id[run["id"]][k] = v + else: + standalone_updates.append(run) + update_dicts = standalone_updates + for run in create_dicts: + if not run.get("trace_id") or not run.get("dotted_order"): + raise ls_utils.LangSmithUserError( + "Batch ingest requires trace_id and dotted_order to be set." + ) + for run in update_dicts: + if not run.get("trace_id") or not run.get("dotted_order"): + raise ls_utils.LangSmithUserError( + "Batch ingest requires trace_id and dotted_order to be set." + ) + # filter out runs that are not sampled + if not pre_sampled: + create_dicts = self._filter_for_sampling(create_dicts) + update_dicts = self._filter_for_sampling(update_dicts, patch=True) + if not create_dicts and not update_dicts: + return + # insert runtime environment + self._insert_runtime_env(create_dicts) + self._insert_runtime_env(update_dicts) + # check size limit + size_limit_bytes = (self.info.batch_ingest_config or {}).get( + "size_limit_bytes" + ) or _SIZE_LIMIT_BYTES + # send the runs in multipart requests + acc_size = 0 + acc_context: List[str] = [] + acc_parts: MultipartParts = [] + for event, payloads in (("post", create_dicts), ("patch", update_dicts)): + for payload in payloads: + parts: MultipartParts = [] + # collect fields to be sent as separate parts + fields = [ + ("inputs", run.pop("inputs", None)), + ("outputs", run.pop("outputs", None)), + ("serialized", run.pop("serialized", None)), + ("events", run.pop("events", None)), + ] + # encode the main run payload + parts.append( + ( + f"{event}.{payload['id']}", + (None, _dumps_json(payload), "application/json"), + ) + ) + # encode the fields we collected + for key, value in fields: + if value is None: + continue + parts.append( + ( + f"{event}.{run['id']}.{key}", + (None, _dumps_json(value), "application/json"), + ), + ) + # encode the attachments + if attachments := all_attachments.pop(payload["id"], None): + for n, (ct, ba) in attachments.items(): + parts.append( + (f"attachment.{payload['id']}.{n}", (None, ba, ct)) + ) + # calculate the size of the parts + size = sum(len(p[1][1]) for p in parts) + # compute context + context = f"trace={payload.get('trace_id')},id={payload.get('id')}" + # if next size would exceed limit, send the current parts + if acc_size + size > size_limit_bytes: + self._send_multipart_req(acc_parts, _context="; ".join(acc_context)) + else: + # otherwise, accumulate the parts + acc_size += size + acc_parts.extend(parts) + acc_context.append(context) + # send the remaining parts + if acc_parts: + self._send_multipart_req(acc_parts, _context="; ".join(acc_context)) + + def _send_multipart_req(self, parts: MultipartParts, *, _context: str): + for api_url, api_key in self._write_api_urls.items(): + try: + encoder = MultipartEncoder(parts, boundary=BOUNDARY) + self.request_with_retries( + "POST", + f"{api_url}/runs/multipart", + request_kwargs={ + "data": encoder, + "headers": { + **self._headers, + X_API_KEY: api_key, + "Content-Type": encoder.content_type, + }, + }, + to_ignore=(ls_utils.LangSmithConflictError,), + stop_after_attempt=3, + _context=_context, + ) + except Exception as e: + try: + exc_desc_lines = traceback.format_exception_only(type(e), e) + exc_desc = "".join(exc_desc_lines).rstrip() + logger.warning(f"Failed to multipart ingest runs: {exc_desc}") + except Exception: + logger.warning(f"Failed to multipart ingest runs: {repr(e)}") + def update_run( self, run_id: ID_TYPE, @@ -5593,7 +5757,10 @@ def _tracing_thread_handle_batch( create = [it.item for it in batch if it.action == "create"] update = [it.item for it in batch if it.action == "update"] try: - client.batch_ingest_runs(create=create, update=update, pre_sampled=True) + if use_multipart: + client.multipart_ingest_runs(create=create, update=update, pre_sampled=True) + else: + client.batch_ingest_runs(create=create, update=update, pre_sampled=True) except Exception: logger.error("Error in tracing queue", exc_info=True) # exceptions are logged elsewhere, but we need to make sure the From b515cdafc261d5bce3289d0771b5f5adc3fe056a Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 13:44:41 -0700 Subject: [PATCH 06/18] Lint --- python/langsmith/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 5c0f8228e..228288279 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -63,7 +63,7 @@ import orjson import requests from requests import adapters as requests_adapters -from requests_toolbelt.multipart import MultipartEncoder +from requests_toolbelt.multipart import MultipartEncoder # type: ignore[import-untyped] from typing_extensions import TypeGuard from urllib3.util import Retry From 1cd973249bc8fcd1d914f8f1e07e7f9be70c55e1 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 15:30:30 -0700 Subject: [PATCH 07/18] Fix up --- python/langsmith/client.py | 51 +++--- python/poetry.lock | 16 +- python/pyproject.toml | 1 + python/tests/unit_tests/test_client.py | 227 ++++++++++++++++++------- 4 files changed, 216 insertions(+), 79 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 228288279..275969ea3 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1587,6 +1587,23 @@ def multipart_ingest_runs( self._run_transform(run, update=True, attachments_collector=all_attachments) for run in update or EMPTY_SEQ ] + # require trace_id and dotted_order + if create_dicts: + for run in create_dicts: + if not run.get("trace_id") or not run.get("dotted_order"): + raise ls_utils.LangSmithUserError( + "Batch ingest requires trace_id and dotted_order to be set." + ) + else: + del run + if update_dicts: + for run in update_dicts: + if not run.get("trace_id") or not run.get("dotted_order"): + raise ls_utils.LangSmithUserError( + "Batch ingest requires trace_id and dotted_order to be set." + ) + else: + del run # combine post and patch dicts where possible if update_dicts and create_dicts: create_by_id = {run["id"]: run for run in create_dicts} @@ -1598,17 +1615,9 @@ def multipart_ingest_runs( create_by_id[run["id"]][k] = v else: standalone_updates.append(run) + else: + del run update_dicts = standalone_updates - for run in create_dicts: - if not run.get("trace_id") or not run.get("dotted_order"): - raise ls_utils.LangSmithUserError( - "Batch ingest requires trace_id and dotted_order to be set." - ) - for run in update_dicts: - if not run.get("trace_id") or not run.get("dotted_order"): - raise ls_utils.LangSmithUserError( - "Batch ingest requires trace_id and dotted_order to be set." - ) # filter out runs that are not sampled if not pre_sampled: create_dicts = self._filter_for_sampling(create_dicts) @@ -1631,10 +1640,10 @@ def multipart_ingest_runs( parts: MultipartParts = [] # collect fields to be sent as separate parts fields = [ - ("inputs", run.pop("inputs", None)), - ("outputs", run.pop("outputs", None)), - ("serialized", run.pop("serialized", None)), - ("events", run.pop("events", None)), + ("inputs", payload.pop("inputs", None)), + ("outputs", payload.pop("outputs", None)), + ("serialized", payload.pop("serialized", None)), + ("events", payload.pop("events", None)), ] # encode the main run payload parts.append( @@ -1649,7 +1658,7 @@ def multipart_ingest_runs( continue parts.append( ( - f"{event}.{run['id']}.{key}", + f"{event}.{payload['id']}.{key}", (None, _dumps_json(value), "application/json"), ), ) @@ -1666,11 +1675,13 @@ def multipart_ingest_runs( # if next size would exceed limit, send the current parts if acc_size + size > size_limit_bytes: self._send_multipart_req(acc_parts, _context="; ".join(acc_context)) - else: - # otherwise, accumulate the parts - acc_size += size - acc_parts.extend(parts) - acc_context.append(context) + acc_parts.clear() + acc_context.clear() + acc_size = 0 + # accumulate the parts + acc_size += size + acc_parts.extend(parts) + acc_context.append(context) # send the remaining parts if acc_parts: self._send_multipart_req(acc_parts, _context="; ".join(acc_context)) diff --git a/python/poetry.lock b/python/poetry.lock index 4814514b9..9af73254e 100644 --- a/python/poetry.lock +++ b/python/poetry.lock @@ -685,6 +685,20 @@ files = [ [package.dependencies] typing-extensions = {version = ">=4.1.0", markers = "python_version < \"3.11\""} +[[package]] +name = "multipart" +version = "1.0.0" +description = "Parser for multipart/form-data" +optional = false +python-versions = ">=3.5" +files = [ + {file = "multipart-1.0.0-py3-none-any.whl", hash = "sha256:85824b3d48b63fe0b6f438feb2b39f9753512e889426fb339e96b6095d4239c8"}, + {file = "multipart-1.0.0.tar.gz", hash = "sha256:6ac937fe07cd4e11cf4ca199f3d8f668e6a37e0f477c5ee032673d45be7f7957"}, +] + +[package.extras] +dev = ["build", "pytest", "pytest-cov", "twine"] + [[package]] name = "mypy" version = "1.11.2" @@ -1903,4 +1917,4 @@ vcr = [] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "244ff8da405f7ae894fcee75f851ef3b51eb556e5ac56a84fc3eb4930851e75c" +content-hash = "a20ea8a3bba074fc87b54139dfe7c4c4ffea37d5d5f4b874fb759baad4d443d0" diff --git a/python/pyproject.toml b/python/pyproject.toml index 8791fbdff..700a17372 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -60,6 +60,7 @@ pytest-rerunfailures = "^14.0" pytest-socket = "^0.7.0" pyperf = "^2.7.0" py-spy = "^0.3.14" +multipart = "^1.0.0" [tool.poetry.group.lint.dependencies] openai = "^1.10" diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 2e8b2043a..4e9fa7344 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -14,7 +14,7 @@ from datetime import datetime, timezone from enum import Enum from io import BytesIO -from typing import Dict, NamedTuple, Optional, Type, Union +from typing import Dict, List, NamedTuple, Optional, Type, Union from unittest import mock from unittest.mock import MagicMock, patch @@ -22,8 +22,10 @@ import orjson import pytest import requests +from multipart import MultipartParser, MultipartPart, parse_options_header from pydantic import BaseModel from requests import HTTPError +from requests_toolbelt.multipart import MultipartEncoder import langsmith.env as ls_env import langsmith.utils as ls_utils @@ -63,7 +65,7 @@ def test_validate_api_url(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain-endpoint.com") monkeypatch.setenv("LANGSMITH_ENDPOINT", "https://api.smith.langsmith-endpoint.com") - client = Client() + client = Client(auto_batch_tracing=False) assert client.api_url == "https://api.smith.langsmith-endpoint.com" # Scenario 2: Both LANGCHAIN_ENDPOINT and LANGSMITH_ENDPOINT @@ -72,7 +74,11 @@ def test_validate_api_url(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain-endpoint.com") monkeypatch.setenv("LANGSMITH_ENDPOINT", "https://api.smith.langsmith-endpoint.com") - client = Client(api_url="https://api.smith.langchain.com", api_key="123") + client = Client( + api_url="https://api.smith.langchain.com", + api_key="123", + auto_batch_tracing=False, + ) assert client.api_url == "https://api.smith.langchain.com" # Scenario 3: LANGCHAIN_ENDPOINT is set, but LANGSMITH_ENDPOINT is not @@ -80,7 +86,7 @@ def test_validate_api_url(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain-endpoint.com") monkeypatch.delenv("LANGSMITH_ENDPOINT", raising=False) - client = Client() + client = Client(auto_batch_tracing=False) assert client.api_url == "https://api.smith.langchain-endpoint.com" # Scenario 4: LANGCHAIN_ENDPOINT is not set, but LANGSMITH_ENDPOINT is set @@ -88,7 +94,7 @@ def test_validate_api_url(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("LANGCHAIN_ENDPOINT", raising=False) monkeypatch.setenv("LANGSMITH_ENDPOINT", "https://api.smith.langsmith-endpoint.com") - client = Client() + client = Client(auto_batch_tracing=False) assert client.api_url == "https://api.smith.langsmith-endpoint.com" @@ -152,12 +158,13 @@ def test_validate_multiple_urls(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("LANGCHAIN_ENDPOINT", raising=False) monkeypatch.delenv("LANGSMITH_ENDPOINT", raising=False) monkeypatch.setenv("LANGSMITH_RUNS_ENDPOINTS", json.dumps(data)) - client = Client() + client = Client(auto_batch_tracing=False) assert client._write_api_urls == data assert client.api_url == "https://api.smith.langsmith-endpoint_1.com" assert client.api_key == "123" +@mock.patch("langsmith.client.requests.Session") def test_headers(monkeypatch: pytest.MonkeyPatch) -> None: _clear_env_cache() monkeypatch.delenv("LANGCHAIN_API_KEY", raising=False) @@ -276,7 +283,8 @@ def test_create_run_unicode() -> None: client.update_run(id_, status="completed") -def test_create_run_mutate() -> None: +@pytest.mark.parametrize("use_multipart_endpoint", (True, False)) +def test_create_run_mutate(use_multipart_endpoint: bool) -> None: inputs = {"messages": ["hi"], "mygen": (i for i in range(10))} session = mock.Mock() session.request = mock.Mock() @@ -286,6 +294,7 @@ def test_create_run_mutate() -> None: session=session, info=ls_schemas.LangSmithInfo( batch_ingest_config=ls_schemas.BatchIngestConfig( + use_multipart_endpoint=use_multipart_endpoint, size_limit_bytes=None, # Note this field is not used here size_limit=100, scale_up_nthreads_limit=16, @@ -315,33 +324,91 @@ def test_create_run_mutate() -> None: trace_id=id_, dotted_order=run_dict["dotted_order"], ) - for _ in range(10): - time.sleep(0.1) # Give the background thread time to stop - payloads = [ - json.loads(call[2]["data"]) - for call in session.request.mock_calls - if call.args and call.args[1].endswith("runs/batch") + if use_multipart_endpoint: + for _ in range(10): + time.sleep(0.1) # Give the background thread time to stop + payloads = [ + (call[2]["headers"], call[2]["data"]) + for call in session.request.mock_calls + if call.args and call.args[1].endswith("runs/multipart") + ] + if payloads: + break + else: + assert False, "No payloads found" + + parts: List[MultipartPart] = [] + for payload in payloads: + headers, data = payload + assert headers["Content-Type"].startswith("multipart/form-data") + # this is a current implementation detail, if we change implementation + # we update this assertion + assert isinstance(data, MultipartEncoder) + boundary = parse_options_header(headers["Content-Type"])[1]["boundary"] + parser = MultipartParser(data, boundary) + parts.extend(parser.parts()) + + assert len(parts) == 3 + assert [p.name for p in parts] == [ + f"post.{id_}", + f"post.{id_}.inputs", + f"post.{id_}.outputs", ] - if payloads: - break - posts = [pr for payload in payloads for pr in payload.get("post", [])] - patches = [pr for payload in payloads for pr in payload.get("patch", [])] - inputs = next( - (pr["inputs"] for pr in itertools.chain(posts, patches) if pr.get("inputs")), - {}, - ) - outputs = next( - (pr["outputs"] for pr in itertools.chain(posts, patches) if pr.get("outputs")), - {}, - ) - # Check that the mutated value wasn't posted - assert "messages" in inputs - assert inputs["messages"] == ["hi"] - assert "mygen" in inputs - assert inputs["mygen"].startswith( # type: ignore - "." - ) - assert outputs == {"messages": ["hi", "there"]} + assert [p.headers.get("content-type") for p in parts] == [ + "application/json", + "application/json", + "application/json", + ] + outputs_parsed = json.loads(parts[2].value) + assert outputs_parsed == outputs + inputs_parsed = json.loads(parts[1].value) + assert inputs_parsed["messages"] == ["hi"] + assert inputs_parsed["mygen"].startswith( # type: ignore + "." + ) + run_parsed = json.loads(parts[0].value) + assert "inputs" not in run_parsed + assert "outputs" not in run_parsed + assert run_parsed["trace_id"] == str(id_) + assert run_parsed["dotted_order"] == run_dict["dotted_order"] + else: + for _ in range(10): + time.sleep(0.1) # Give the background thread time to stop + payloads = [ + json.loads(call[2]["data"]) + for call in session.request.mock_calls + if call.args and call.args[1].endswith("runs/batch") + ] + if payloads: + break + else: + assert False, "No payloads found" + posts = [pr for payload in payloads for pr in payload.get("post", [])] + patches = [pr for payload in payloads for pr in payload.get("patch", [])] + inputs = next( + ( + pr["inputs"] + for pr in itertools.chain(posts, patches) + if pr.get("inputs") + ), + {}, + ) + outputs = next( + ( + pr["outputs"] + for pr in itertools.chain(posts, patches) + if pr.get("outputs") + ), + {}, + ) + # Check that the mutated value wasn't posted + assert "messages" in inputs + assert inputs["messages"] == ["hi"] + assert "mygen" in inputs + assert inputs["mygen"].startswith( # type: ignore + "." + ) + assert outputs == {"messages": ["hi", "there"]} class CallTracker: @@ -951,7 +1018,10 @@ def test_batch_ingest_run_retry_on_429(mock_raise_for_status): @pytest.mark.parametrize("payload_size", [MB, 5 * MB, 9 * MB, 21 * MB]) -def test_batch_ingest_run_splits_large_batches(payload_size: int): +@pytest.mark.parametrize("use_multipart_endpoint", (True, False)) +def test_batch_ingest_run_splits_large_batches( + payload_size: int, use_multipart_endpoint: bool +): mock_session = MagicMock() client = Client(api_key="test", session=mock_session) mock_response = MagicMock() @@ -981,36 +1051,76 @@ def test_batch_ingest_run_splits_large_batches(payload_size: int): } for run_id in patch_ids ] - client.batch_ingest_runs(create=posts, update=patches) - # we can support up to 20MB per batch, so we need to find the number of batches - # we should be sending - max_in_batch = max(1, (20 * MB) // (payload_size + 20)) + if use_multipart_endpoint: + client.multipart_ingest_runs(create=posts, update=patches) + # we can support up to 20MB per batch, so we need to find the number of batches + # we should be sending + max_in_batch = max(1, (20 * MB) // (payload_size + 20)) + + expected_num_requests = min(6, math.ceil((len(run_ids) * 2) / max_in_batch)) + # count the number of POST requests + assert sum( + [1 for call in mock_session.request.call_args_list if call[0][0] == "POST"] + ) in (expected_num_requests, expected_num_requests + 1) + request_bodies = [ + op + for call in mock_session.request.call_args_list + for op in ( + MultipartParser( + call[1]["data"], + parse_options_header(call[1]["headers"]["Content-Type"])[1][ + "boundary" + ], + ) + if call[0][0] == "POST" + else [] + ) + ] + all_run_ids = run_ids + patch_ids - expected_num_requests = min(6, math.ceil((len(run_ids) * 2) / max_in_batch)) - # count the number of POST requests - assert ( - sum([1 for call in mock_session.request.call_args_list if call[0][0] == "POST"]) - == expected_num_requests - ) - request_bodies = [ - op - for call in mock_session.request.call_args_list - for reqs in ( - orjson.loads(call[1]["data"]).values() if call[0][0] == "POST" else [] + # Check that all the run_ids are present in the request bodies + for run_id in all_run_ids: + assert any( + [body.name.split(".")[1] == run_id for body in request_bodies] + ), run_id + else: + client.batch_ingest_runs(create=posts, update=patches) + # we can support up to 20MB per batch, so we need to find the number of batches + # we should be sending + max_in_batch = max(1, (20 * MB) // (payload_size + 20)) + + expected_num_requests = min(6, math.ceil((len(run_ids) * 2) / max_in_batch)) + # count the number of POST requests + assert ( + sum( + [ + 1 + for call in mock_session.request.call_args_list + if call[0][0] == "POST" + ] + ) + == expected_num_requests ) - for op in reqs - ] - all_run_ids = run_ids + patch_ids + request_bodies = [ + op + for call in mock_session.request.call_args_list + for reqs in ( + orjson.loads(call[1]["data"]).values() if call[0][0] == "POST" else [] + ) + for op in reqs + ] + all_run_ids = run_ids + patch_ids - # Check that all the run_ids are present in the request bodies - for run_id in all_run_ids: - assert any([body["id"] == str(run_id) for body in request_bodies]) + # Check that all the run_ids are present in the request bodies + for run_id in all_run_ids: + assert any([body["id"] == str(run_id) for body in request_bodies]) - # Check that no duplicate run_ids are present in the request bodies - assert len(request_bodies) == len(set([body["id"] for body in request_bodies])) + # Check that no duplicate run_ids are present in the request bodies + assert len(request_bodies) == len(set([body["id"] for body in request_bodies])) -def test_select_eval_results(): +@mock.patch("langsmith.client.requests.Session") +def test_select_eval_results(mock_session_cls: mock.Mock): expected = EvaluationResult( key="foo", value="bar", @@ -1050,6 +1160,7 @@ def test_select_eval_results(): @pytest.mark.parametrize("client_cls", [Client, AsyncClient]) +@mock.patch("langsmith.client.requests.Session") def test_validate_api_key_if_hosted( monkeypatch: pytest.MonkeyPatch, client_cls: Union[Type[Client], Type[AsyncClient]] ) -> None: From a217db7cb2580f56337e6f385336f35a5711220f Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 15:49:03 -0700 Subject: [PATCH 08/18] Add FF --- python/langsmith/client.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 275969ea3..f64e62e43 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -5820,7 +5820,10 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: size_limit: int = batch_ingest_config["size_limit"] scale_up_nthreads_limit: int = batch_ingest_config["scale_up_nthreads_limit"] scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] - use_multipart: bool = batch_ingest_config.get("use_multipart_endpoint", False) + use_multipart: bool = os.getenv( + "LANGSMITH_FF_MULTIPART", + batch_ingest_config.get("use_multipart_endpoint", False), + ) sub_threads: List[threading.Thread] = [] # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached From 454622b8cf3f38eaddf4e03dcaf3af26974c57db Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 15:49:11 -0700 Subject: [PATCH 09/18] Add integration test --- python/tests/integration_tests/test_client.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/python/tests/integration_tests/test_client.py b/python/tests/integration_tests/test_client.py index 87c2c6f94..be4f27147 100644 --- a/python/tests/integration_tests/test_client.py +++ b/python/tests/integration_tests/test_client.py @@ -615,7 +615,10 @@ def test_create_chat_example( langchain_client.delete_dataset(dataset_id=dataset.id) -def test_batch_ingest_runs(langchain_client: Client) -> None: +@pytest.mark.parametrize("use_multipart_endpoint", [True, False]) +def test_batch_ingest_runs( + langchain_client: Client, use_multipart_endpoint: bool +) -> None: _session = "__test_batch_ingest_runs" trace_id = uuid4() trace_id_2 = uuid4() @@ -669,7 +672,12 @@ def test_batch_ingest_runs(langchain_client: Client) -> None: "outputs": {"output1": 4, "output2": 5}, }, ] - langchain_client.batch_ingest_runs(create=runs_to_create, update=runs_to_update) + if use_multipart_endpoint: + langchain_client.multipart_ingest_runs( + create=runs_to_create, update=runs_to_update + ) + else: + langchain_client.batch_ingest_runs(create=runs_to_create, update=runs_to_update) runs = [] wait = 4 for _ in range(15): From d8d582faaecc6cd9e1021c525dc05e491d1eef0d Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 16:01:38 -0700 Subject: [PATCH 10/18] Fix for urllib<2 --- python/langsmith/client.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index f64e62e43..0250fdc4a 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -39,6 +39,7 @@ import warnings import weakref from dataclasses import dataclass, field +from inspect import signature from queue import Empty, PriorityQueue, Queue from typing import ( TYPE_CHECKING, @@ -65,6 +66,7 @@ from requests import adapters as requests_adapters from requests_toolbelt.multipart import MultipartEncoder # type: ignore[import-untyped] from typing_extensions import TypeGuard +from urllib3.poolmanager import PoolKey from urllib3.util import Retry import langsmith @@ -95,6 +97,7 @@ class ZoneInfo: # type: ignore[no-redef] EMPTY_SEQ: tuple[Dict, ...] = () BOUNDARY = uuid.uuid4().hex MultipartParts = List[Tuple[str, Tuple[None, bytes, str]]] +URLLIB3_SUPPORTS_BLOCKSIZE = "key_blocksize" in signature(PoolKey).parameters def _parse_token_or_url( @@ -462,7 +465,9 @@ def __init__( super().__init__(pool_connections, pool_maxsize, max_retries, pool_block) def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs): - pool_kwargs["blocksize"] = self._blocksize + if URLLIB3_SUPPORTS_BLOCKSIZE: + # urllib3 before 2.0 doesn't support blocksize + pool_kwargs["blocksize"] = self._blocksize return super().init_poolmanager(connections, maxsize, block, **pool_kwargs) From 69dc1e27e3d4876835a3abe06763bf7e93c13064 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 16:04:21 -0700 Subject: [PATCH 11/18] Lint --- python/langsmith/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 0250fdc4a..8d5cde35e 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -66,7 +66,7 @@ from requests import adapters as requests_adapters from requests_toolbelt.multipart import MultipartEncoder # type: ignore[import-untyped] from typing_extensions import TypeGuard -from urllib3.poolmanager import PoolKey +from urllib3.poolmanager import PoolKey # type: ignore[attr-defined] from urllib3.util import Retry import langsmith From 08ec720e8f1de3d848aaed8656546646a3524c1f Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 16:07:07 -0700 Subject: [PATCH 12/18] Lint --- python/langsmith/client.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 8d5cde35e..19a0d09dc 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -5825,10 +5825,10 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: size_limit: int = batch_ingest_config["size_limit"] scale_up_nthreads_limit: int = batch_ingest_config["scale_up_nthreads_limit"] scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] - use_multipart: bool = os.getenv( - "LANGSMITH_FF_MULTIPART", - batch_ingest_config.get("use_multipart_endpoint", False), - ) + if multipart_override := os.getenv("LANGSMITH_FF_MULTIPART"): + use_multipart = multipart_override.lower() in ["1", "true"] + else: + use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) sub_threads: List[threading.Thread] = [] # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached From a1f100e235b8fb3401a180fb2dafd3e4cdca7cbc Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 16:39:42 -0700 Subject: [PATCH 13/18] Fix --- python/langsmith/utils.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/langsmith/utils.py b/python/langsmith/utils.py index bf6068004..b63c65b52 100644 --- a/python/langsmith/utils.py +++ b/python/langsmith/utils.py @@ -146,9 +146,6 @@ def raise_for_status_with_text( except requests.HTTPError as e: raise requests.HTTPError(str(e), response.text) from e # type: ignore[call-arg] - except httpx.HTTPError as e: - raise httpx.HTTPError(str(e), response.text) from e # type: ignore[call-arg] - def get_enum_value(enu: Union[enum.Enum, str]) -> str: """Get the value of a string enum.""" From 12c110fab9900ebe1ffcd4e94d9d89a8db4db4b5 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 16:39:45 -0700 Subject: [PATCH 14/18] Fix --- python/langsmith/client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 19a0d09dc..c21f1f2b5 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -5825,10 +5825,9 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: size_limit: int = batch_ingest_config["size_limit"] scale_up_nthreads_limit: int = batch_ingest_config["scale_up_nthreads_limit"] scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] - if multipart_override := os.getenv("LANGSMITH_FF_MULTIPART"): - use_multipart = multipart_override.lower() in ["1", "true"] - else: - use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) + use_multipart = os.getenv("LANGSMITH_FF_MULTIPART") in ["1", "true"] + # use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) + # TODO replace FF with reading from batch_ingest_config sub_threads: List[threading.Thread] = [] # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached From 2ceeebfb9c6eab82279722c42c4ba143c7577c6a Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 16:44:13 -0700 Subject: [PATCH 15/18] Fix test --- python/tests/unit_tests/test_client.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 4e9fa7344..13b73628b 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -284,7 +284,12 @@ def test_create_run_unicode() -> None: @pytest.mark.parametrize("use_multipart_endpoint", (True, False)) -def test_create_run_mutate(use_multipart_endpoint: bool) -> None: +def test_create_run_mutate( + use_multipart_endpoint: bool, monkeypatch: pytest.MonkeyPatch +) -> None: + if use_multipart_endpoint: + monkeypatch.setenv("LANGSMITH_FF_MULTIPART", "true") + # TODO remove this when removing FF inputs = {"messages": ["hi"], "mygen": (i for i in range(10))} session = mock.Mock() session.request = mock.Mock() From 89510e7da806e4cbb2e619fb4bc16762e6097db9 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 17:10:04 -0700 Subject: [PATCH 16/18] Apply suggestions from code review Co-authored-by: William FH <13333726+hinthornw@users.noreply.github.com> --- python/langsmith/client.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index c21f1f2b5..e99164831 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1255,7 +1255,8 @@ def _run_transform( logger.warning( "You're trying to submit a run with attachments, but your current" " LangSmith integration doesn't support it. Please contact the " - " LangChain team for assitance on how to upgrade." + " LangChain team at support at langchain" + " dot dev for assistance on how to upgrade." ) return run_create @@ -1597,7 +1598,8 @@ def multipart_ingest_runs( for run in create_dicts: if not run.get("trace_id") or not run.get("dotted_order"): raise ls_utils.LangSmithUserError( - "Batch ingest requires trace_id and dotted_order to be set." + "Multipart ingest requires trace_id and dotted_order" + " to be set in create dicts." ) else: del run @@ -1605,7 +1607,8 @@ def multipart_ingest_runs( for run in update_dicts: if not run.get("trace_id") or not run.get("dotted_order"): raise ls_utils.LangSmithUserError( - "Batch ingest requires trace_id and dotted_order to be set." + "Multipart ingest requires trace_id and dotted_order" + " to be set in update dicts." ) else: del run From e68c424d58ead2b55d85096b1339bd89a99e6f96 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 17:13:45 -0700 Subject: [PATCH 17/18] Fix --- python/langsmith/client.py | 11 ++++++----- python/langsmith/schemas.py | 7 ++++--- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index e99164831..8c36c7aea 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1409,7 +1409,7 @@ def batch_ingest_runs( ] = None, *, pre_sampled: bool = False, - ): + ) -> None: """Batch ingest/upsert multiple runs in the Langsmith system. Args: @@ -1424,7 +1424,7 @@ def batch_ingest_runs( Defaults to False. Returns: - None: If both `create` and `update` are None. + None Raises: LangsmithAPIError: If there is an error in the API request. @@ -1557,7 +1557,7 @@ def multipart_ingest_runs( ] = None, *, pre_sampled: bool = False, - ): + ) -> None: """Batch ingest/upsert multiple runs in the Langsmith system. Args: @@ -1572,7 +1572,7 @@ def multipart_ingest_runs( Defaults to False. Returns: - None: If both `create` and `update` are None. + None Raises: LangsmithAPIError: If there is an error in the API request. @@ -5795,6 +5795,7 @@ def _tracing_thread_handle_batch( _AUTO_SCALE_UP_NTHREADS_LIMIT = 16 _AUTO_SCALE_DOWN_NEMPTY_TRIGGER = 4 _BLOCKSIZE_BYTES = 1024 * 1024 # 1MB +_USE_MULTIPART = os.getenv("LANGSMITH_FF_MULTIPART") in ["1", "true"] def _ensure_ingest_config( @@ -5828,7 +5829,7 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: size_limit: int = batch_ingest_config["size_limit"] scale_up_nthreads_limit: int = batch_ingest_config["scale_up_nthreads_limit"] scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] - use_multipart = os.getenv("LANGSMITH_FF_MULTIPART") in ["1", "true"] + use_multipart = _USE_MULTIPART # use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) # TODO replace FF with reading from batch_ingest_config diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index 3a69e4d6f..fb4e13ef6 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -254,6 +254,10 @@ class RunBase(BaseModel): tags: Optional[List[str]] = None """Tags for categorizing or annotating the run.""" + attachments: Attachments = Field(default_factory=dict) + """Attachments associated with the run. + Each entry is a tuple of (mime_type, bytes).""" + _lock: threading.Lock = PrivateAttr(default_factory=threading.Lock) @property @@ -322,9 +326,6 @@ class Run(RunBase): """ # noqa: E501 in_dataset: Optional[bool] = None """Whether this run is in a dataset.""" - attachments: Attachments = Field(default_factory=dict) - """Attachments associated with the run. - Each entry is a tuple of (mime_type, bytes).""" _host_url: Optional[str] = PrivateAttr(default=None) def __init__(self, _host_url: Optional[str] = None, **kwargs: Any) -> None: From 39eeeef22350168e9b8e54f7677825b1db1825a4 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 1 Oct 2024 17:25:37 -0700 Subject: [PATCH 18/18] Actually don't cache FF --- python/langsmith/client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 8c36c7aea..d0c57f8a2 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -5795,7 +5795,6 @@ def _tracing_thread_handle_batch( _AUTO_SCALE_UP_NTHREADS_LIMIT = 16 _AUTO_SCALE_DOWN_NEMPTY_TRIGGER = 4 _BLOCKSIZE_BYTES = 1024 * 1024 # 1MB -_USE_MULTIPART = os.getenv("LANGSMITH_FF_MULTIPART") in ["1", "true"] def _ensure_ingest_config( @@ -5829,7 +5828,7 @@ def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: size_limit: int = batch_ingest_config["size_limit"] scale_up_nthreads_limit: int = batch_ingest_config["scale_up_nthreads_limit"] scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] - use_multipart = _USE_MULTIPART + use_multipart = os.getenv("LANGSMITH_FF_MULTIPART") in ["1", "true"] # use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) # TODO replace FF with reading from batch_ingest_config