From 9087b2ecc24cb83c404b483c92694d584e87c27b Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Mon, 6 Jan 2025 14:47:55 -0800 Subject: [PATCH 01/24] feat(python): Enable compression by default --- python/langsmith/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 5dae4a38c..77fb5e372 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -476,12 +476,12 @@ def __init__( # Create a session and register a finalizer to close it session_ = session if session else requests.Session() self.session = session_ - if ls_utils.get_env_var("USE_RUN_COMPRESSION"): + if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION"): + self.compressed_runs = None + else: self._futures: set[cf.Future] = set() self.compressed_runs: Optional[CompressedRuns] = CompressedRuns() self._data_available_event = threading.Event() - else: - self.compressed_runs = None self._info = ( info From bcb5ffd3cb83ef4bfb60c5a57bbef3946909fa11 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Mon, 6 Jan 2025 14:51:34 -0800 Subject: [PATCH 02/24] update deps --- python/langsmith/_internal/_compressed_runs.py | 17 +---------------- python/poetry.lock | 9 ++++----- python/pyproject.toml | 3 +-- 3 files changed, 6 insertions(+), 23 deletions(-) diff --git a/python/langsmith/_internal/_compressed_runs.py b/python/langsmith/_internal/_compressed_runs.py index e4f0f8f92..19a512876 100644 --- a/python/langsmith/_internal/_compressed_runs.py +++ b/python/langsmith/_internal/_compressed_runs.py @@ -1,12 +1,7 @@ import io import threading -try: - from zstandard import ZstdCompressor # type: ignore[import] - - HAVE_ZSTD = True -except ImportError: - HAVE_ZSTD = False +from zstandard import ZstdCompressor # type: ignore[import] from langsmith import utils as ls_utils @@ -20,11 +15,6 @@ def __init__(self): self.lock = threading.Lock() self.uncompressed_size = 0 - if not HAVE_ZSTD: - raise ImportError( - "zstandard package required for compression. " - "Install with 'pip install langsmith[compression]'" - ) self.compressor_writer = ZstdCompressor( level=compression_level, threads=-1 ).stream_writer(self.buffer, closefd=False) @@ -34,11 +24,6 @@ def reset(self): self.run_count = 0 self.uncompressed_size = 0 - if not HAVE_ZSTD: - raise ImportError( - "zstandard package required for compression. " - "Install with 'pip install langsmith[compression]'" - ) self.compressor_writer = ZstdCompressor( level=compression_level, threads=-1 ).stream_writer(self.buffer, closefd=False) diff --git a/python/poetry.lock b/python/poetry.lock index a31c97af7..9e7b33838 100644 --- a/python/poetry.lock +++ b/python/poetry.lock @@ -113,7 +113,7 @@ files = [ name = "cffi" version = "1.17.1" description = "Foreign Function Interface for Python calling C code." -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "cffi-1.17.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:df8b1c11f177bc2313ec4b2d46baec87a5f3e71fc8b45dab2ee7cae86d9aba14"}, @@ -1280,7 +1280,7 @@ files = [ name = "pycparser" version = "2.22" description = "C parser in Python" -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"}, @@ -2200,7 +2200,7 @@ propcache = ">=0.2.0" name = "zstandard" version = "0.23.0" description = "Zstandard bindings for Python" -optional = true +optional = false python-versions = ">=3.8" files = [ {file = "zstandard-0.23.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:bf0a05b6059c0528477fba9054d09179beb63744355cab9f38059548fedd46a9"}, @@ -2309,11 +2309,10 @@ cffi = {version = ">=1.11", markers = "platform_python_implementation == \"PyPy\ cffi = ["cffi (>=1.11)"] [extras] -compression = ["zstandard"] langsmith-pyo3 = ["langsmith-pyo3"] vcr = [] [metadata] lock-version = "2.0" python-versions = ">=3.9,<4.0" -content-hash = "19b288e10f9d6c040798efb74ae2778103abdc87c298b8c3eb21f7685db56642" +content-hash = "8ea9d9d23cb55e70b8982c4b6d5292cc715b00ee1bd9b0f2cf6bffc59e9bbf54" diff --git a/python/pyproject.toml b/python/pyproject.toml index e02abadd9..356e7f7ec 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -38,7 +38,6 @@ requests-toolbelt = "^1.0.0" # Enabled via `langsmith_pyo3` extra: `pip install langsmith[langsmith_pyo3]`. langsmith-pyo3 = { version = "^0.1.0rc2", optional = true } # Enabled via `compression` extra: `pip install langsmith[compression]`. -zstandard = { version = "^0.23.0", optional = true } [tool.poetry.group.dev.dependencies] pytest = "^7.3.1" @@ -66,6 +65,7 @@ pytest-socket = "^0.7.0" pyperf = "^2.7.0" py-spy = "^0.3.14" multipart = "^1.0.0" +zstandard = "^0.23.0" [tool.poetry.group.lint.dependencies] openai = "^1.10" @@ -77,7 +77,6 @@ pytest-socket = "^0.7.0" [tool.poetry.extras] vcr = ["vcrpy"] langsmith_pyo3 = ["langsmith-pyo3"] -compression = ["zstandard"] [build-system] requires = ["poetry-core"] From ec73127f1fdb9282f56d5a97c93239b760f869f8 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Mon, 6 Jan 2025 14:54:24 -0800 Subject: [PATCH 03/24] mypy --- python/langsmith/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 77fb5e372..2f6ee31cb 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -477,10 +477,10 @@ def __init__( session_ = session if session else requests.Session() self.session = session_ if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION"): - self.compressed_runs = None + self.compressed_runs: Optional[CompressedRuns] = None else: self._futures: set[cf.Future] = set() - self.compressed_runs: Optional[CompressedRuns] = CompressedRuns() + self.compressed_runs = CompressedRuns() self._data_available_event = threading.Event() self._info = ( From 3fb3d5dba9556b3fa528048f40f4a1ab89168913 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Tue, 7 Jan 2025 11:11:14 -0800 Subject: [PATCH 04/24] feat(python): Enable compression only for multipart --- python/langsmith/client.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 2f6ee31cb..b480f859f 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -476,18 +476,22 @@ def __init__( # Create a session and register a finalizer to close it session_ = session if session else requests.Session() self.session = session_ - if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION"): + self._info = ( + info + if info is None or isinstance(info, ls_schemas.LangSmithInfo) + else ls_schemas.LangSmithInfo(**info) + ) + + use_multipart = (self.info.batch_ingest_config or {}).get( + "use_multipart_endpoint", False + ) + if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") and not use_multipart: self.compressed_runs: Optional[CompressedRuns] = None else: self._futures: set[cf.Future] = set() self.compressed_runs = CompressedRuns() self._data_available_event = threading.Event() - self._info = ( - info - if info is None or isinstance(info, ls_schemas.LangSmithInfo) - else ls_schemas.LangSmithInfo(**info) - ) weakref.finalize(self, close_session, self.session) atexit.register(close_session, session_) # Initialize auto batching From 95eccf8d1f7d1be517d562676bc0f84b4c42f165 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Tue, 7 Jan 2025 11:12:34 -0800 Subject: [PATCH 05/24] feat(python): Enable compression for multipart only --- python/langsmith/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index b480f859f..a46caa94c 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -485,7 +485,7 @@ def __init__( use_multipart = (self.info.batch_ingest_config or {}).get( "use_multipart_endpoint", False ) - if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") and not use_multipart: + if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") and use_multipart: self.compressed_runs: Optional[CompressedRuns] = None else: self._futures: set[cf.Future] = set() From eae4ccf4b6e69324d6c686043cfe29dfaeb65fe6 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Tue, 7 Jan 2025 11:13:45 -0800 Subject: [PATCH 06/24] feat(python): fix multipart logic --- python/langsmith/client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index a46caa94c..10c65bbea 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -481,11 +481,10 @@ def __init__( if info is None or isinstance(info, ls_schemas.LangSmithInfo) else ls_schemas.LangSmithInfo(**info) ) - use_multipart = (self.info.batch_ingest_config or {}).get( "use_multipart_endpoint", False ) - if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") and use_multipart: + if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") or not use_multipart: self.compressed_runs: Optional[CompressedRuns] = None else: self._futures: set[cf.Future] = set() From c2426243c4c5421da2da5c688dfcd01bd7b9f50f Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Tue, 7 Jan 2025 15:47:11 -0800 Subject: [PATCH 07/24] move compressed runs to background thread --- .../langsmith/_internal/_background_thread.py | 12 +++++++++ python/langsmith/client.py | 26 +++---------------- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index f817e27d4..f28ac301e 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -32,6 +32,8 @@ combine_serialized_queue_operations, ) +from langsmith._internal._compressed_runs import CompressedRuns + if TYPE_CHECKING: from langsmith.client import Client @@ -214,6 +216,16 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) + if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") is False and use_multipart: + client._futures = set() + client.compressed_runs = CompressedRuns() + client._data_available_event = threading.Event() + if client.info.version and ls_utils.is_version_greater_or_equal(client.info.version, "0.8.10"): # TODO(angus): update this version + threading.Thread( + target=tracing_control_thread_func_compress_parallel, + args=(weakref.ref(client),), + ).start() + sub_threads: List[threading.Thread] = [] # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached num_known_refs = 3 diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 10c65bbea..065469aa5 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -77,11 +77,7 @@ from langsmith._internal._background_thread import ( tracing_control_thread_func as _tracing_control_thread_func, ) -from langsmith._internal._background_thread import ( - tracing_control_thread_func_compress_parallel as _tracing_control_thread_func_compress_parallel, -) from langsmith._internal._beta_decorator import warn_beta -from langsmith._internal._compressed_runs import CompressedRuns from langsmith._internal._constants import ( _AUTO_SCALE_UP_NTHREADS_LIMIT, _BLOCKSIZE_BYTES, @@ -481,29 +477,12 @@ def __init__( if info is None or isinstance(info, ls_schemas.LangSmithInfo) else ls_schemas.LangSmithInfo(**info) ) - use_multipart = (self.info.batch_ingest_config or {}).get( - "use_multipart_endpoint", False - ) - if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") or not use_multipart: - self.compressed_runs: Optional[CompressedRuns] = None - else: - self._futures: set[cf.Future] = set() - self.compressed_runs = CompressedRuns() - self._data_available_event = threading.Event() - weakref.finalize(self, close_session, self.session) atexit.register(close_session, session_) # Initialize auto batching - if auto_batch_tracing and self.compressed_runs is not None: - self.tracing_queue: Optional[PriorityQueue] = None - threading.Thread( - target=_tracing_control_thread_func_compress_parallel, - # arg must be a weakref to self to avoid the Thread object - # preventing garbage collection of the Client object - args=(weakref.ref(self),), - ).start() - elif auto_batch_tracing: + if auto_batch_tracing: self.tracing_queue = PriorityQueue() + self.compressed_runs = None threading.Thread( target=_tracing_control_thread_func, @@ -513,6 +492,7 @@ def __init__( ).start() else: self.tracing_queue = None + self.compressed_runs = None # Mount the HTTPAdapter with the retry configuration. adapter = _LangSmithHttpAdapter( From 4beb54e3a78219944347cd7aca12020083e9c751 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Tue, 7 Jan 2025 16:14:44 -0800 Subject: [PATCH 08/24] feat(python):check for instance flag --- .../langsmith/_internal/_background_thread.py | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index f28ac301e..f8bef0ac9 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -20,6 +20,7 @@ from langsmith import schemas as ls_schemas from langsmith import utils as ls_utils +from langsmith._internal._compressed_runs import CompressedRuns from langsmith._internal._constants import ( _AUTO_SCALE_DOWN_NEMPTY_TRIGGER, _AUTO_SCALE_UP_NTHREADS_LIMIT, @@ -32,8 +33,6 @@ combine_serialized_queue_operations, ) -from langsmith._internal._compressed_runs import CompressedRuns - if TYPE_CHECKING: from langsmith.client import Client @@ -217,14 +216,21 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") is False and use_multipart: + if not (client.info.instance_flags or {}).get( + "zstd_compression_enabled", False + ): + raise ValueError( + "Zstd compression is not enabled. Please update to the latest " + "version of LangSmith or set the environment variable " + "DISABLE_RUN_COMPRESSION=true." + ) client._futures = set() client.compressed_runs = CompressedRuns() client._data_available_event = threading.Event() - if client.info.version and ls_utils.is_version_greater_or_equal(client.info.version, "0.8.10"): # TODO(angus): update this version - threading.Thread( - target=tracing_control_thread_func_compress_parallel, - args=(weakref.ref(client),), - ).start() + threading.Thread( + target=tracing_control_thread_func_compress_parallel, + args=(weakref.ref(client),), + ).start() sub_threads: List[threading.Thread] = [] # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached From b4cb53e05cc7bd4964958e64006679d7348e1bea Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Tue, 7 Jan 2025 16:43:52 -0800 Subject: [PATCH 09/24] feat(python): add instance flag check --- python/langsmith/_internal/_background_thread.py | 4 +++- python/langsmith/client.py | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index f8bef0ac9..fae9e3c1b 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -215,7 +215,7 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) - if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") is False and use_multipart: + if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") is None and use_multipart: if not (client.info.instance_flags or {}).get( "zstd_compression_enabled", False ): @@ -274,6 +274,7 @@ def keep_thread_active() -> bool: _tracing_thread_handle_batch( client, tracing_queue, next_batch, use_multipart ) + # drain the queue on exit while next_batch := _tracing_thread_drain_queue( tracing_queue, limit=size_limit, block=False @@ -284,6 +285,7 @@ def keep_thread_active() -> bool: def tracing_control_thread_func_compress_parallel( client_ref: weakref.ref[Client], ) -> None: + print("tracing_control_thread_func_compress_parallel") client = client_ref() if client is None: return diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 065469aa5..dcaa4ef85 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -100,6 +100,7 @@ serialized_run_operation_to_multipart_parts_and_context, ) from langsmith._internal._serde import dumps_json as _dumps_json +from python.langsmith._internal._compressed_runs import CompressedRuns try: from zoneinfo import ZoneInfo # type: ignore[import-not-found] @@ -482,7 +483,7 @@ def __init__( # Initialize auto batching if auto_batch_tracing: self.tracing_queue = PriorityQueue() - self.compressed_runs = None + self.compressed_runs: Optional[CompressedRuns] = None threading.Thread( target=_tracing_control_thread_func, @@ -1273,6 +1274,8 @@ def create_run( run_create.get("trace_id") is not None and run_create.get("dotted_order") is not None ): + print("creating run") + print("compressed runs", self.compressed_runs) if self._pyo3_client is not None: self._pyo3_client.create_run(run_create) elif self.compressed_runs is not None: From 57109f0236fc5c82100ad3271c3ff1817e6c5eef Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Tue, 7 Jan 2025 16:45:09 -0800 Subject: [PATCH 10/24] fix(python): remove print --- python/langsmith/client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index dcaa4ef85..4605cd1ee 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1274,8 +1274,6 @@ def create_run( run_create.get("trace_id") is not None and run_create.get("dotted_order") is not None ): - print("creating run") - print("compressed runs", self.compressed_runs) if self._pyo3_client is not None: self._pyo3_client.create_run(run_create) elif self.compressed_runs is not None: From 344091623b05b1b209f2d4bbad9eadc2e496aee7 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Tue, 7 Jan 2025 21:47:22 -0800 Subject: [PATCH 11/24] feat(python):Add asserts for mypy --- python/langsmith/_internal/_background_thread.py | 5 ++++- python/langsmith/client.py | 13 +++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index fae9e3c1b..5721f6dc8 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -285,11 +285,14 @@ def keep_thread_active() -> bool: def tracing_control_thread_func_compress_parallel( client_ref: weakref.ref[Client], ) -> None: - print("tracing_control_thread_func_compress_parallel") client = client_ref() if client is None: return + assert client.compressed_runs is not None + assert client._data_available_event is not None + assert client._futures is not None + batch_ingest_config = _ensure_ingest_config(client.info) size_limit: int = batch_ingest_config["size_limit"] size_limit_bytes = batch_ingest_config.get("size_limit_bytes", 20_971_520) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 4605cd1ee..42c173658 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -78,6 +78,7 @@ tracing_control_thread_func as _tracing_control_thread_func, ) from langsmith._internal._beta_decorator import warn_beta +from langsmith._internal._compressed_runs import CompressedRuns from langsmith._internal._constants import ( _AUTO_SCALE_UP_NTHREADS_LIMIT, _BLOCKSIZE_BYTES, @@ -100,7 +101,6 @@ serialized_run_operation_to_multipart_parts_and_context, ) from langsmith._internal._serde import dumps_json as _dumps_json -from python.langsmith._internal._compressed_runs import CompressedRuns try: from zoneinfo import ZoneInfo # type: ignore[import-not-found] @@ -480,10 +480,12 @@ def __init__( ) weakref.finalize(self, close_session, self.session) atexit.register(close_session, session_) + self.compressed_runs: Optional[CompressedRuns] = None + self._data_available_event: Optional[threading.Event] = None + self._futures: Optional[set[cf.Future]] = None # Initialize auto batching if auto_batch_tracing: - self.tracing_queue = PriorityQueue() - self.compressed_runs: Optional[CompressedRuns] = None + self.tracing_queue: Optional[PriorityQueue] = PriorityQueue() threading.Thread( target=_tracing_control_thread_func, @@ -493,7 +495,6 @@ def __init__( ).start() else: self.tracing_queue = None - self.compressed_runs = None # Mount the HTTPAdapter with the retry configuration. adapter = _LangSmithHttpAdapter( @@ -1277,6 +1278,7 @@ def create_run( if self._pyo3_client is not None: self._pyo3_client.create_run(run_create) elif self.compressed_runs is not None: + assert self._data_available_event is not None serialized_op = serialize_run_dict("post", run_create) multipart_form = ( serialized_run_operation_to_multipart_parts_and_context( @@ -1979,6 +1981,7 @@ def update_run( ) ) with self.compressed_runs.lock: + assert self._data_available_event is not None compress_multipart_parts_and_context( multipart_form, self.compressed_runs, @@ -2014,6 +2017,8 @@ def flush_compressed_runs(self, attempts: int = 3) -> None: if self.compressed_runs is None: return + assert self._futures is not None + # Attempt to drain and send any remaining data from langsmith._internal._background_thread import ( HTTP_REQUEST_THREAD_POOL, From f0ec00c6262ccedd65a4dbdc648e9bfcc1369792 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Tue, 7 Jan 2025 22:02:18 -0800 Subject: [PATCH 12/24] feat(python): Periodically flush buffer --- .../langsmith/_internal/_background_thread.py | 69 ++++++++++++++----- 1 file changed, 51 insertions(+), 18 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index 5721f6dc8..3f6e50382 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -6,6 +6,7 @@ import logging import sys import threading +import time import weakref from multiprocessing import cpu_count from queue import Empty, Queue @@ -283,7 +284,7 @@ def keep_thread_active() -> bool: def tracing_control_thread_func_compress_parallel( - client_ref: weakref.ref[Client], + client_ref: weakref.ref[Client], flush_interval: float = 0.5 ) -> None: client = client_ref() if client is None: @@ -323,35 +324,67 @@ def keep_thread_active() -> bool: # for now, keep thread alive return True + last_flush_time = time.monotonic() + while True: triggered = client._data_available_event.wait(timeout=0.05) if not keep_thread_active(): break - if not triggered: - continue - client._data_available_event.clear() - data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer( - client, size_limit, size_limit_bytes - ) + # If data arrived, clear the event and attempt a drain + if triggered: + client._data_available_event.clear() - if data_stream is not None: - try: - future = HTTP_REQUEST_THREAD_POOL.submit( - client._send_compressed_multipart_req, - data_stream, - compressed_runs_info, + data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer( + client, size_limit, size_limit_bytes + ) + # If we have data, submit the send request + if data_stream is not None: + try: + future = HTTP_REQUEST_THREAD_POOL.submit( + client._send_compressed_multipart_req, + data_stream, + compressed_runs_info, + ) + client._futures.add(future) + except RuntimeError: + client._send_compressed_multipart_req( + data_stream, + compressed_runs_info, + ) + last_flush_time = time.monotonic() + + else: + if (time.monotonic() - last_flush_time) >= flush_interval: + data_stream, compressed_runs_info = ( + _tracing_thread_drain_compressed_buffer( + client, size_limit=1, size_limit_bytes=1 + ) ) - client._futures.add(future) - except RuntimeError: - client._send_compressed_multipart_req(data_stream, compressed_runs_info) + if data_stream is not None: + try: + cf.wait( + [ + HTTP_REQUEST_THREAD_POOL.submit( + client._send_compressed_multipart_req, + data_stream, + compressed_runs_info, + ) + ] + ) + except RuntimeError: + client._send_compressed_multipart_req( + data_stream, + compressed_runs_info, + ) + last_flush_time = time.monotonic() - # Drain the buffer on exit + # Drain the buffer on exit (final flush) try: final_data_stream, compressed_runs_info = ( _tracing_thread_drain_compressed_buffer( client, size_limit=1, size_limit_bytes=1 - ) # Force final drain + ) ) if final_data_stream is not None: try: From 02b17ca762ecbfa1d36f4ee45d6e7660453b9fb8 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Wed, 8 Jan 2025 09:44:07 -0800 Subject: [PATCH 13/24] feat(python): Revert back to regular tracing if compression not enabled --- .../langsmith/_internal/_background_thread.py | 35 +++++++++++-------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index 3f6e50382..9f85f1cd6 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -102,7 +102,8 @@ def _tracing_thread_drain_queue( def _tracing_thread_drain_compressed_buffer( client: Client, size_limit: int = 100, size_limit_bytes: int | None = 20_971_520 ) -> Tuple[Optional[io.BytesIO], Optional[Tuple[int, int]]]: - assert client.compressed_runs is not None + if client.compressed_runs is None: + return None, None with client.compressed_runs.lock: client.compressed_runs.compressor_writer.flush() current_size = client.compressed_runs.buffer.tell() @@ -220,18 +221,20 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: if not (client.info.instance_flags or {}).get( "zstd_compression_enabled", False ): - raise ValueError( + logger.warning( "Zstd compression is not enabled. Please update to the latest " - "version of LangSmith or set the environment variable " - "DISABLE_RUN_COMPRESSION=true." + "version of LangSmith." ) - client._futures = set() - client.compressed_runs = CompressedRuns() - client._data_available_event = threading.Event() - threading.Thread( - target=tracing_control_thread_func_compress_parallel, - args=(weakref.ref(client),), - ).start() + else: + client._futures = set() + client.compressed_runs = CompressedRuns() + client._data_available_event = threading.Event() + threading.Thread( + target=tracing_control_thread_func_compress_parallel, + args=(weakref.ref(client),), + ).start() + + return sub_threads: List[threading.Thread] = [] # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached @@ -290,9 +293,13 @@ def tracing_control_thread_func_compress_parallel( if client is None: return - assert client.compressed_runs is not None - assert client._data_available_event is not None - assert client._futures is not None + if ( + client.compressed_runs is None + or client._data_available_event is None + or client._futures is None + ): + logger.error("Required compression attributes not initialized") + return batch_ingest_config = _ensure_ingest_config(client.info) size_limit: int = batch_ingest_config["size_limit"] From 46eea0feb3cb251de2846567e59a323aa51e1c7e Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Wed, 8 Jan 2025 13:02:23 -0800 Subject: [PATCH 14/24] keep tracing thread alive --- python/langsmith/_internal/_background_thread.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index 9f85f1cd6..992c83e3f 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -216,7 +216,7 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: scale_up_nthreads_limit: int = batch_ingest_config["scale_up_nthreads_limit"] scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) - + if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") is None and use_multipart: if not (client.info.instance_flags or {}).get( "zstd_compression_enabled", False @@ -234,8 +234,6 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: args=(weakref.ref(client),), ).start() - return - sub_threads: List[threading.Thread] = [] # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached num_known_refs = 3 From 6d3c9f5b69a03e3b2ae133afd7bdafcc71751f75 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Wed, 8 Jan 2025 13:05:00 -0800 Subject: [PATCH 15/24] format --- python/langsmith/_internal/_background_thread.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index 992c83e3f..079b2b4c9 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -216,7 +216,7 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: scale_up_nthreads_limit: int = batch_ingest_config["scale_up_nthreads_limit"] scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) - + if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") is None and use_multipart: if not (client.info.instance_flags or {}).get( "zstd_compression_enabled", False From 61c74bbb050c7b990f29d03018f260cf443220f5 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Thu, 9 Jan 2025 09:18:35 -0800 Subject: [PATCH 16/24] Add compression unit test --- python/tests/unit_tests/test_client.py | 70 ++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 939aa9ad2..9de9de1a3 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -2069,3 +2069,73 @@ def test_evaluate_methods() -> None: eval_args = set(inspect.signature(aevaluate).parameters).difference({"client"}) extra_args = client_args - eval_args assert not extra_args + + +@patch("langsmith.client.requests.Session") +def test_create_run_with_zstd_compression(mock_session_cls: mock.Mock) -> None: + """Test that runs are sent using zstd compression when compression is enabled.""" + # Prepare a mocked session + mock_session = MagicMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_session.request.return_value = mock_response + mock_session_cls.return_value = mock_session + + with patch.dict("os.environ", {}, clear=True): + info = ls_schemas.LangSmithInfo( + version="0.6.0", + instance_flags={"zstd_compression_enabled": True}, + batch_ingest_config=ls_schemas.BatchIngestConfig( + use_multipart_endpoint=True, + size_limit=1, + size_limit_bytes=128, + scale_up_nthreads_limit=4, + scale_up_qsize_trigger=3, + scale_down_nempty_trigger=1, + ), + ) + client = Client( + api_url="http://localhost:1984", + api_key="123", + auto_batch_tracing=True, + session=mock_session, + info=info, + ) + + # Create a few runs with larger payloads so there's something to compress + for i in range(2): + run_id = uuid.uuid4() + client.create_run( + name=f"my_test_run_{i}", + run_type="llm", + inputs={"some_key": "some_val" * 1000}, # bigger input + id=run_id, + trace_id=run_id, + dotted_order=str(run_id), + ) + + # Let the background threads flush + if client.tracing_queue: + client.tracing_queue.join() + if client._futures is not None: + for fut in client._futures: + fut.result() + + # Inspect the calls + post_calls = [] + for call_obj in mock_session.request.mock_calls: + if call_obj.args and call_obj.args[0] == "POST": + post_calls.append(call_obj) + assert len(post_calls) >= 1, "Expected at least one POST to the compression endpoint" + + + call_data = post_calls[0][2]["data"] + + if hasattr(call_data, "read"): + call_data = call_data.read() + + zstd_magic = b"\x28\xb5\x2f\xfd" + assert call_data.startswith(zstd_magic), ( + "Expected the request body to start with zstd magic bytes; " + "it appears runs were not compressed." + ) \ No newline at end of file From d67d1c901931b016ddfa2c9a3683c68d76318582 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Fri, 10 Jan 2025 14:31:10 -0800 Subject: [PATCH 17/24] format tests --- python/tests/unit_tests/test_client.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 9de9de1a3..1ff7b530c 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -2126,8 +2126,9 @@ def test_create_run_with_zstd_compression(mock_session_cls: mock.Mock) -> None: for call_obj in mock_session.request.mock_calls: if call_obj.args and call_obj.args[0] == "POST": post_calls.append(call_obj) - assert len(post_calls) >= 1, "Expected at least one POST to the compression endpoint" - + assert ( + len(post_calls) >= 1 + ), "Expected at least one POST to the compression endpoint" call_data = post_calls[0][2]["data"] @@ -2138,4 +2139,4 @@ def test_create_run_with_zstd_compression(mock_session_cls: mock.Mock) -> None: assert call_data.startswith(zstd_magic), ( "Expected the request body to start with zstd magic bytes; " "it appears runs were not compressed." - ) \ No newline at end of file + ) From 8c4b3c66a5c0d8d88bb58f75344013f6eec5c0d0 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Mon, 13 Jan 2025 11:50:01 -0800 Subject: [PATCH 18/24] address comments --- python/langsmith/_internal/_background_thread.py | 10 +++++++--- python/langsmith/client.py | 15 ++++++++++++--- python/langsmith/utils.py | 12 ++++++++++++ python/tests/unit_tests/test_client.py | 4 +++- 4 files changed, 34 insertions(+), 7 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index 079b2b4c9..4cfb73622 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -217,13 +217,14 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"] use_multipart = batch_ingest_config.get("use_multipart_endpoint", False) - if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") is None and use_multipart: + disable_compression = ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") + if not ls_utils.is_truish(disable_compression) and use_multipart: if not (client.info.instance_flags or {}).get( "zstd_compression_enabled", False ): logger.warning( - "Zstd compression is not enabled. Please update to the latest " - "version of LangSmith." + "Run compression is not enabled. Please update to the latest " + "version of LangSmith. Falling back to regular multipart ingestion." ) else: client._futures = set() @@ -234,6 +235,9 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: args=(weakref.ref(client),), ).start() + # client.tracing_queue.join() + return + sub_threads: List[threading.Thread] = [] # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached num_known_refs = 3 diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 42c173658..ac103d06b 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1278,7 +1278,10 @@ def create_run( if self._pyo3_client is not None: self._pyo3_client.create_run(run_create) elif self.compressed_runs is not None: - assert self._data_available_event is not None + if self._data_available_event is None: + raise ValueError( + "Run compression is enabled but threading event is not configured" + ) serialized_op = serialize_run_dict("post", run_create) multipart_form = ( serialized_run_operation_to_multipart_parts_and_context( @@ -1981,7 +1984,10 @@ def update_run( ) ) with self.compressed_runs.lock: - assert self._data_available_event is not None + if self._data_available_event is None: + raise ValueError( + "Run compression is enabled but threading event is not configured" + ) compress_multipart_parts_and_context( multipart_form, self.compressed_runs, @@ -2017,7 +2023,10 @@ def flush_compressed_runs(self, attempts: int = 3) -> None: if self.compressed_runs is None: return - assert self._futures is not None + if self._futures is None: + raise ValueError( + "Run compression is enabled but request pool futures is not set" + ) # Attempt to drain and send any remaining data from langsmith._internal._background_thread import ( diff --git a/python/langsmith/utils.py b/python/langsmith/utils.py index 3e8956d1a..440ad1a5b 100644 --- a/python/langsmith/utils.py +++ b/python/langsmith/utils.py @@ -795,3 +795,15 @@ def _get_function_name(fn: Callable, depth: int = 0) -> str: return _get_function_name(fn.__call__, depth + 1) return str(fn) + + +def is_truish(val: Any) -> bool: + """Check if the value is truish. + + Args: + val (Any): The value to check. + + Returns: + bool: True if the value is truish, False otherwise. + """ + return val is True or val == "true" or val == "True" or val == "TRUE" diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 1ff7b530c..23c0342f7 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -2102,13 +2102,15 @@ def test_create_run_with_zstd_compression(mock_session_cls: mock.Mock) -> None: info=info, ) + time.sleep(1) + # Create a few runs with larger payloads so there's something to compress for i in range(2): run_id = uuid.uuid4() client.create_run( name=f"my_test_run_{i}", run_type="llm", - inputs={"some_key": "some_val" * 1000}, # bigger input + inputs={"some_key": "some_val" * 1000}, id=run_id, trace_id=run_id, dotted_order=str(run_id), From 23080d32b22bd3b48ed30e9b796e4abea253da0d Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Mon, 13 Jan 2025 12:22:26 -0800 Subject: [PATCH 19/24] Add unit tests for disabled compression in instance flags and env vars --- python/tests/unit_tests/test_client.py | 198 +++++++++++++++++++++++++ 1 file changed, 198 insertions(+) diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 23c0342f7..93b71d42a 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -26,8 +26,10 @@ import pytest import requests from multipart import MultipartParser, MultipartPart, parse_options_header +from requests_toolbelt import MultipartEncoder from pydantic import BaseModel from requests import HTTPError +import zstandard as zstd import langsmith.env as ls_env import langsmith.utils as ls_utils @@ -2142,3 +2144,199 @@ def test_create_run_with_zstd_compression(mock_session_cls: mock.Mock) -> None: "Expected the request body to start with zstd magic bytes; " "it appears runs were not compressed." ) + +@patch("langsmith.client.requests.Session") +def test_create_run_without_compression_support(mock_session_cls: mock.Mock) -> None: + """Test that runs use regular multipart when server doesn't support compression.""" + mock_session = MagicMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_session.request.return_value = mock_response + mock_session_cls.return_value = mock_session + + with patch.dict("os.environ", {}, clear=True): + info = ls_schemas.LangSmithInfo( + version="0.6.0", + instance_flags={}, # No compression flag + batch_ingest_config=ls_schemas.BatchIngestConfig( + use_multipart_endpoint=True, + size_limit=1, + size_limit_bytes=128, + scale_up_nthreads_limit=4, + scale_up_qsize_trigger=3, + scale_down_nempty_trigger=1, + ), + ) + client = Client( + api_url="http://localhost:1984", + api_key="123", + auto_batch_tracing=True, + session=mock_session, + info=info, + ) + + run_id = uuid.uuid4() + inputs = {"key": "there"} + client.create_run( + name="test_run", + run_type="llm", + inputs=inputs, + id=run_id, + trace_id=run_id, + dotted_order=str(run_id), + ) + + outputs = {"key": "hi there"} + + client.update_run( + run_id, + outputs=outputs, + end_time=datetime.now(timezone.utc), + trace_id=run_id, + dotted_order=str(run_id), + ) + + if client.tracing_queue: + client.tracing_queue.join() + + time.sleep(0.1) + + post_calls = [ + call_obj for call_obj in mock_session.request.mock_calls + if call_obj.args and call_obj.args[0] == "POST" + ] + assert len(post_calls) >= 1 + + payloads = [ + (call[2]["headers"], call[2]["data"]) + for call in mock_session.request.mock_calls + if call.args and call.args[1].endswith("runs/multipart") + ] + if not payloads: + assert False, "No payloads found" + + parts: List[MultipartPart] = [] + for payload in payloads: + headers, data = payload + assert headers["Content-Type"].startswith("multipart/form-data") + assert isinstance(data, bytes) + boundary = parse_options_header(headers["Content-Type"])[1]["boundary"] + parser = MultipartParser(io.BytesIO(data), boundary) + parts.extend(parser.parts()) + + assert [p.name for p in parts] == [ + f"post.{run_id}", + f"post.{run_id}.inputs", + f"post.{run_id}.outputs", + ] + assert [p.headers.get("content-type") for p in parts] == [ + "application/json", + "application/json", + "application/json", + ] + + outputs_parsed = json.loads(parts[2].value) + assert outputs_parsed == outputs + inputs_parsed = json.loads(parts[1].value) + assert inputs_parsed == inputs + run_parsed = json.loads(parts[0].value) + assert run_parsed["trace_id"] == str(run_id) + assert run_parsed["dotted_order"] == str(run_id) + +@patch("langsmith.client.requests.Session") +def test_create_run_with_disabled_compression(mock_session_cls: mock.Mock) -> None: + """Test that runs use regular multipart when compression is explicitly disabled.""" + mock_session = MagicMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_session.request.return_value = mock_response + mock_session_cls.return_value = mock_session + + with patch.dict("os.environ", {"LANGSMITH_DISABLE_RUN_COMPRESSION": "true"}, clear=True): + info = ls_schemas.LangSmithInfo( + version="0.6.0", + instance_flags={"zstd_compression_enabled": True}, # Enabled on server + batch_ingest_config=ls_schemas.BatchIngestConfig( + use_multipart_endpoint=True, + size_limit=1, + size_limit_bytes=128, + scale_up_nthreads_limit=4, + scale_up_qsize_trigger=3, + scale_down_nempty_trigger=1, + ), + ) + client = Client( + api_url="http://localhost:1984", + api_key="123", + auto_batch_tracing=True, + session=mock_session, + info=info, + ) + + run_id = uuid.uuid4() + inputs = {"key": "there"} + client.create_run( + name="test_run", + run_type="llm", + inputs=inputs, + id=run_id, + trace_id=run_id, + dotted_order=str(run_id), + ) + + outputs = {"key": "hi there"} + client.update_run( + run_id, + outputs=outputs, + end_time=datetime.now(timezone.utc), + trace_id=run_id, + dotted_order=str(run_id), + ) + + # Let the background threads flush + if client.tracing_queue: + client.tracing_queue.join() + + time.sleep(0.1) + + post_calls = [ + call_obj for call_obj in mock_session.request.mock_calls + if call_obj.args and call_obj.args[0] == "POST" + ] + assert len(post_calls) >= 1 + + payloads = [ + (call[2]["headers"], call[2]["data"]) + for call in mock_session.request.mock_calls + if call.args and call.args[1].endswith("runs/multipart") + ] + if not payloads: + assert False, "No payloads found" + + parts: List[MultipartPart] = [] + for payload in payloads: + headers, data = payload + assert headers["Content-Type"].startswith("multipart/form-data") + assert isinstance(data, bytes) + boundary = parse_options_header(headers["Content-Type"])[1]["boundary"] + parser = MultipartParser(io.BytesIO(data), boundary) + parts.extend(parser.parts()) + + assert [p.name for p in parts] == [ + f"post.{run_id}", + f"post.{run_id}.inputs", + f"post.{run_id}.outputs", + ] + assert [p.headers.get("content-type") for p in parts] == [ + "application/json", + "application/json", + "application/json", + ] + + outputs_parsed = json.loads(parts[2].value) + assert outputs_parsed == outputs + inputs_parsed = json.loads(parts[1].value) + assert inputs_parsed == inputs + run_parsed = json.loads(parts[0].value) + assert run_parsed["trace_id"] == str(run_id) + assert run_parsed["dotted_order"] == str(run_id) From 3560088b3c2f9ca44818a0be18887197495713e8 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Mon, 13 Jan 2025 12:47:16 -0800 Subject: [PATCH 20/24] lint --- python/tests/unit_tests/test_client.py | 34 ++++++++++++++------------ 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 93b71d42a..3be2f1940 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -26,10 +26,8 @@ import pytest import requests from multipart import MultipartParser, MultipartPart, parse_options_header -from requests_toolbelt import MultipartEncoder from pydantic import BaseModel from requests import HTTPError -import zstandard as zstd import langsmith.env as ls_env import langsmith.utils as ls_utils @@ -2145,6 +2143,7 @@ def test_create_run_with_zstd_compression(mock_session_cls: mock.Mock) -> None: "it appears runs were not compressed." ) + @patch("langsmith.client.requests.Session") def test_create_run_without_compression_support(mock_session_cls: mock.Mock) -> None: """Test that runs use regular multipart when server doesn't support compression.""" @@ -2202,11 +2201,12 @@ def test_create_run_without_compression_support(mock_session_cls: mock.Mock) -> time.sleep(0.1) post_calls = [ - call_obj for call_obj in mock_session.request.mock_calls + call_obj + for call_obj in mock_session.request.mock_calls if call_obj.args and call_obj.args[0] == "POST" ] assert len(post_calls) >= 1 - + payloads = [ (call[2]["headers"], call[2]["data"]) for call in mock_session.request.mock_calls @@ -2225,10 +2225,10 @@ def test_create_run_without_compression_support(mock_session_cls: mock.Mock) -> parts.extend(parser.parts()) assert [p.name for p in parts] == [ - f"post.{run_id}", - f"post.{run_id}.inputs", - f"post.{run_id}.outputs", - ] + f"post.{run_id}", + f"post.{run_id}.inputs", + f"post.{run_id}.outputs", + ] assert [p.headers.get("content-type") for p in parts] == [ "application/json", "application/json", @@ -2243,6 +2243,7 @@ def test_create_run_without_compression_support(mock_session_cls: mock.Mock) -> assert run_parsed["trace_id"] == str(run_id) assert run_parsed["dotted_order"] == str(run_id) + @patch("langsmith.client.requests.Session") def test_create_run_with_disabled_compression(mock_session_cls: mock.Mock) -> None: """Test that runs use regular multipart when compression is explicitly disabled.""" @@ -2252,7 +2253,9 @@ def test_create_run_with_disabled_compression(mock_session_cls: mock.Mock) -> No mock_session.request.return_value = mock_response mock_session_cls.return_value = mock_session - with patch.dict("os.environ", {"LANGSMITH_DISABLE_RUN_COMPRESSION": "true"}, clear=True): + with patch.dict( + "os.environ", {"LANGSMITH_DISABLE_RUN_COMPRESSION": "true"}, clear=True + ): info = ls_schemas.LangSmithInfo( version="0.6.0", instance_flags={"zstd_compression_enabled": True}, # Enabled on server @@ -2300,11 +2303,12 @@ def test_create_run_with_disabled_compression(mock_session_cls: mock.Mock) -> No time.sleep(0.1) post_calls = [ - call_obj for call_obj in mock_session.request.mock_calls + call_obj + for call_obj in mock_session.request.mock_calls if call_obj.args and call_obj.args[0] == "POST" ] assert len(post_calls) >= 1 - + payloads = [ (call[2]["headers"], call[2]["data"]) for call in mock_session.request.mock_calls @@ -2323,10 +2327,10 @@ def test_create_run_with_disabled_compression(mock_session_cls: mock.Mock) -> No parts.extend(parser.parts()) assert [p.name for p in parts] == [ - f"post.{run_id}", - f"post.{run_id}.inputs", - f"post.{run_id}.outputs", - ] + f"post.{run_id}", + f"post.{run_id}.inputs", + f"post.{run_id}.outputs", + ] assert [p.headers.get("content-type") for p in parts] == [ "application/json", "application/json", From 1eaa9d0532f93b3c81a29194507c767405546357 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Mon, 13 Jan 2025 12:48:47 -0800 Subject: [PATCH 21/24] add 1 to is truish --- python/langsmith/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/langsmith/utils.py b/python/langsmith/utils.py index 440ad1a5b..d18222909 100644 --- a/python/langsmith/utils.py +++ b/python/langsmith/utils.py @@ -806,4 +806,4 @@ def is_truish(val: Any) -> bool: Returns: bool: True if the value is truish, False otherwise. """ - return val is True or val == "true" or val == "True" or val == "TRUE" + return val is True or val == "true" or val == "True" or val == "TRUE" or val == "1" From 36232c320c8adbfabaa7fca5065f0faa72bf67e0 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Mon, 13 Jan 2025 13:09:51 -0800 Subject: [PATCH 22/24] fix test - reset cache --- python/tests/unit_tests/test_client.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 3be2f1940..69d6cff83 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -2102,8 +2102,6 @@ def test_create_run_with_zstd_compression(mock_session_cls: mock.Mock) -> None: info=info, ) - time.sleep(1) - # Create a few runs with larger payloads so there's something to compress for i in range(2): run_id = uuid.uuid4() @@ -2123,6 +2121,8 @@ def test_create_run_with_zstd_compression(mock_session_cls: mock.Mock) -> None: for fut in client._futures: fut.result() + time.sleep(0.1) + # Inspect the calls post_calls = [] for call_obj in mock_session.request.mock_calls: @@ -2247,6 +2247,10 @@ def test_create_run_without_compression_support(mock_session_cls: mock.Mock) -> @patch("langsmith.client.requests.Session") def test_create_run_with_disabled_compression(mock_session_cls: mock.Mock) -> None: """Test that runs use regular multipart when compression is explicitly disabled.""" + + # Clear the cache to ensure the environment variable is re-evaluated + ls_utils.get_env_var.cache_clear() + mock_session = MagicMock() mock_response = MagicMock() mock_response.status_code = 200 From 20034ae81f880d947803685f7ad516313b2e7a98 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Mon, 13 Jan 2025 13:10:36 -0800 Subject: [PATCH 23/24] remove cmt --- python/langsmith/_internal/_background_thread.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index 4cfb73622..929bbc955 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -235,7 +235,6 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: args=(weakref.ref(client),), ).start() - # client.tracing_queue.join() return sub_threads: List[threading.Thread] = [] From 8b9e4dffa08b1227629e93cdedd7e838d10533dc Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Mon, 13 Jan 2025 16:50:19 -0800 Subject: [PATCH 24/24] remove return --- python/langsmith/_internal/_background_thread.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index 929bbc955..85c99d46d 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -235,8 +235,6 @@ def tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: args=(weakref.ref(client),), ).start() - return - sub_threads: List[threading.Thread] = [] # 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached num_known_refs = 3