Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Enable zstd compression by default #1384

Merged
merged 25 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9087b2e
feat(python): Enable compression by default
angus-langchain Jan 6, 2025
bcb5ffd
update deps
angus-langchain Jan 6, 2025
ec73127
mypy
angus-langchain Jan 6, 2025
3fb3d5d
feat(python): Enable compression only for multipart
angus-langchain Jan 7, 2025
95eccf8
feat(python): Enable compression for multipart only
angus-langchain Jan 7, 2025
eae4ccf
feat(python): fix multipart logic
angus-langchain Jan 7, 2025
3fe4a29
Merge branch 'main' into angus/enable-compression
angus-langchain Jan 7, 2025
c242624
move compressed runs to background thread
angus-langchain Jan 7, 2025
4beb54e
feat(python):check for instance flag
angus-langchain Jan 8, 2025
b4cb53e
feat(python): add instance flag check
angus-langchain Jan 8, 2025
57109f0
fix(python): remove print
angus-langchain Jan 8, 2025
3440916
feat(python):Add asserts for mypy
angus-langchain Jan 8, 2025
f0ec00c
feat(python): Periodically flush buffer
angus-langchain Jan 8, 2025
02b17ca
feat(python): Revert back to regular tracing if compression not enabled
angus-langchain Jan 8, 2025
46eea0f
keep tracing thread alive
angus-langchain Jan 8, 2025
6d3c9f5
format
angus-langchain Jan 8, 2025
61c74bb
Add compression unit test
angus-langchain Jan 9, 2025
d67d1c9
format tests
angus-langchain Jan 10, 2025
8c4b3c6
address comments
angus-langchain Jan 13, 2025
23080d3
Add unit tests for disabled compression in instance flags and env vars
angus-langchain Jan 13, 2025
3560088
lint
angus-langchain Jan 13, 2025
1eaa9d0
add 1 to is truish
angus-langchain Jan 13, 2025
36232c3
fix test - reset cache
angus-langchain Jan 13, 2025
20034ae
remove cmt
angus-langchain Jan 13, 2025
8b9e4df
remove return
angus-langchain Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comments
  • Loading branch information
angus-langchain committed Jan 13, 2025
commit 8c4b3c66a5c0d8d88bb58f75344013f6eec5c0d0
10 changes: 7 additions & 3 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

Check notice on line 1 in python/langsmith/_internal/_background_thread.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... create_5_000_run_trees: Mean +- std dev: 650 ms +- 48 ms ........... create_10_000_run_trees: Mean +- std dev: 1.31 sec +- 0.09 sec ........... create_20_000_run_trees: Mean +- std dev: 1.32 sec +- 0.08 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 714 us +- 13 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 24.9 ms +- 0.3 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 104 ms +- 3 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.3 ms +- 0.2 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (15.5 ms) is 22% of the mean (69.4 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 69.4 ms +- 15.5 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 193 ms +- 4 ms

Check notice on line 1 in python/langsmith/_internal/_background_thread.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 216 ms | 193 ms: 1.12x faster | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.5 ms | 25.3 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 657 ms | 650 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.0 ms | 24.9 ms: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 104 ms: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.30 sec | 1.31 sec: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 708 us | 714 us: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.29 sec | 1.32 sec: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 66.6 ms | 69.4 ms: 1.04x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.01x faster | +-----------------------------------------------+----------+------------------------+

import concurrent.futures as cf
import functools
Expand Down Expand Up @@ -217,13 +217,14 @@
scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"]
use_multipart = batch_ingest_config.get("use_multipart_endpoint", False)

if ls_utils.get_env_var("DISABLE_RUN_COMPRESSION") is None and use_multipart:
disable_compression = ls_utils.get_env_var("DISABLE_RUN_COMPRESSION")
if not ls_utils.is_truish(disable_compression) and use_multipart:
if not (client.info.instance_flags or {}).get(
"zstd_compression_enabled", False
):
logger.warning(
"Zstd compression is not enabled. Please update to the latest "
"version of LangSmith."
"Run compression is not enabled. Please update to the latest "
"version of LangSmith. Falling back to regular multipart ingestion."
)
else:
client._futures = set()
Expand All @@ -234,6 +235,9 @@
args=(weakref.ref(client),),
).start()

# client.tracing_queue.join()
angus-langchain marked this conversation as resolved.
Show resolved Hide resolved
return

sub_threads: List[threading.Thread] = []
# 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached
num_known_refs = 3
Expand Down
15 changes: 12 additions & 3 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,10 @@ def create_run(
if self._pyo3_client is not None:
self._pyo3_client.create_run(run_create)
elif self.compressed_runs is not None:
assert self._data_available_event is not None
if self._data_available_event is None:
raise ValueError(
"Run compression is enabled but threading event is not configured"
)
serialized_op = serialize_run_dict("post", run_create)
multipart_form = (
serialized_run_operation_to_multipart_parts_and_context(
Expand Down Expand Up @@ -1981,7 +1984,10 @@ def update_run(
)
)
with self.compressed_runs.lock:
assert self._data_available_event is not None
if self._data_available_event is None:
raise ValueError(
"Run compression is enabled but threading event is not configured"
)
compress_multipart_parts_and_context(
multipart_form,
self.compressed_runs,
Expand Down Expand Up @@ -2017,7 +2023,10 @@ def flush_compressed_runs(self, attempts: int = 3) -> None:
if self.compressed_runs is None:
return

assert self._futures is not None
if self._futures is None:
raise ValueError(
"Run compression is enabled but request pool futures is not set"
)

# Attempt to drain and send any remaining data
from langsmith._internal._background_thread import (
Expand Down
12 changes: 12 additions & 0 deletions python/langsmith/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,3 +795,15 @@ def _get_function_name(fn: Callable, depth: int = 0) -> str:
return _get_function_name(fn.__call__, depth + 1)

return str(fn)


def is_truish(val: Any) -> bool:
angus-langchain marked this conversation as resolved.
Show resolved Hide resolved
"""Check if the value is truish.

Args:
val (Any): The value to check.

Returns:
bool: True if the value is truish, False otherwise.
"""
return val is True or val == "true" or val == "True" or val == "TRUE"
4 changes: 3 additions & 1 deletion python/tests/unit_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2102,13 +2102,15 @@ def test_create_run_with_zstd_compression(mock_session_cls: mock.Mock) -> None:
info=info,
)

time.sleep(1)

# Create a few runs with larger payloads so there's something to compress
for i in range(2):
run_id = uuid.uuid4()
client.create_run(
name=f"my_test_run_{i}",
run_type="llm",
inputs={"some_key": "some_val" * 1000}, # bigger input
inputs={"some_key": "some_val" * 1000},
id=run_id,
trace_id=run_id,
dotted_order=str(run_id),
Expand Down
Loading