Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Enable zstd compression by default #1384

Merged
merged 25 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9087b2e
feat(python): Enable compression by default
angus-langchain Jan 6, 2025
bcb5ffd
update deps
angus-langchain Jan 6, 2025
ec73127
mypy
angus-langchain Jan 6, 2025
3fb3d5d
feat(python): Enable compression only for multipart
angus-langchain Jan 7, 2025
95eccf8
feat(python): Enable compression for multipart only
angus-langchain Jan 7, 2025
eae4ccf
feat(python): fix multipart logic
angus-langchain Jan 7, 2025
3fe4a29
Merge branch 'main' into angus/enable-compression
angus-langchain Jan 7, 2025
c242624
move compressed runs to background thread
angus-langchain Jan 7, 2025
4beb54e
feat(python):check for instance flag
angus-langchain Jan 8, 2025
b4cb53e
feat(python): add instance flag check
angus-langchain Jan 8, 2025
57109f0
fix(python): remove print
angus-langchain Jan 8, 2025
3440916
feat(python):Add asserts for mypy
angus-langchain Jan 8, 2025
f0ec00c
feat(python): Periodically flush buffer
angus-langchain Jan 8, 2025
02b17ca
feat(python): Revert back to regular tracing if compression not enabled
angus-langchain Jan 8, 2025
46eea0f
keep tracing thread alive
angus-langchain Jan 8, 2025
6d3c9f5
format
angus-langchain Jan 8, 2025
61c74bb
Add compression unit test
angus-langchain Jan 9, 2025
d67d1c9
format tests
angus-langchain Jan 10, 2025
8c4b3c6
address comments
angus-langchain Jan 13, 2025
23080d3
Add unit tests for disabled compression in instance flags and env vars
angus-langchain Jan 13, 2025
3560088
lint
angus-langchain Jan 13, 2025
1eaa9d0
add 1 to is truish
angus-langchain Jan 13, 2025
36232c3
fix test - reset cache
angus-langchain Jan 13, 2025
20034ae
remove cmt
angus-langchain Jan 13, 2025
8b9e4df
remove return
angus-langchain Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(python):Add asserts for mypy
  • Loading branch information
angus-langchain committed Jan 8, 2025
commit 344091623b05b1b209f2d4bbad9eadc2e496aee7
5 changes: 4 additions & 1 deletion python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

Check notice on line 1 in python/langsmith/_internal/_background_thread.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... WARNING: the benchmark result may be unstable * the standard deviation (78.5 ms) is 11% of the mean (684 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. create_5_000_run_trees: Mean +- std dev: 684 ms +- 79 ms ........... create_10_000_run_trees: Mean +- std dev: 1.31 sec +- 0.09 sec ........... create_20_000_run_trees: Mean +- std dev: 1.35 sec +- 0.07 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 701 us +- 12 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.0 ms +- 0.4 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 105 ms +- 3 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.3 ms +- 0.2 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (17.2 ms) is 24% of the mean (71.1 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 71.1 ms +- 17.2 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 196 ms +- 4 ms

Check notice on line 1 in python/langsmith/_internal/_background_thread.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 222 ms | 196 ms: 1.13x faster | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.46 sec | 1.31 sec: 1.11x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 739 ms | 684 ms: 1.08x faster | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.44 sec | 1.35 sec: 1.07x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.2 ms | 25.0 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.4 ms | 25.3 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 105 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 690 us | 701 us: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 67.6 ms | 71.1 ms: 1.05x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.03x faster | +-----------------------------------------------+----------+------------------------+

import concurrent.futures as cf
import functools
Expand Down Expand Up @@ -285,11 +285,14 @@
def tracing_control_thread_func_compress_parallel(
client_ref: weakref.ref[Client],
) -> None:
print("tracing_control_thread_func_compress_parallel")
client = client_ref()
if client is None:
return

assert client.compressed_runs is not None
angus-langchain marked this conversation as resolved.
Show resolved Hide resolved
assert client._data_available_event is not None
assert client._futures is not None

batch_ingest_config = _ensure_ingest_config(client.info)
size_limit: int = batch_ingest_config["size_limit"]
size_limit_bytes = batch_ingest_config.get("size_limit_bytes", 20_971_520)
Expand Down
13 changes: 9 additions & 4 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
tracing_control_thread_func as _tracing_control_thread_func,
)
from langsmith._internal._beta_decorator import warn_beta
from langsmith._internal._compressed_runs import CompressedRuns
from langsmith._internal._constants import (
_AUTO_SCALE_UP_NTHREADS_LIMIT,
_BLOCKSIZE_BYTES,
Expand All @@ -100,7 +101,6 @@
serialized_run_operation_to_multipart_parts_and_context,
)
from langsmith._internal._serde import dumps_json as _dumps_json
from python.langsmith._internal._compressed_runs import CompressedRuns

try:
from zoneinfo import ZoneInfo # type: ignore[import-not-found]
Expand Down Expand Up @@ -480,10 +480,12 @@ def __init__(
)
weakref.finalize(self, close_session, self.session)
atexit.register(close_session, session_)
self.compressed_runs: Optional[CompressedRuns] = None
self._data_available_event: Optional[threading.Event] = None
self._futures: Optional[set[cf.Future]] = None
# Initialize auto batching
if auto_batch_tracing:
self.tracing_queue = PriorityQueue()
self.compressed_runs: Optional[CompressedRuns] = None
self.tracing_queue: Optional[PriorityQueue] = PriorityQueue()

threading.Thread(
target=_tracing_control_thread_func,
Expand All @@ -493,7 +495,6 @@ def __init__(
).start()
else:
self.tracing_queue = None
self.compressed_runs = None

# Mount the HTTPAdapter with the retry configuration.
adapter = _LangSmithHttpAdapter(
Expand Down Expand Up @@ -1277,6 +1278,7 @@ def create_run(
if self._pyo3_client is not None:
self._pyo3_client.create_run(run_create)
elif self.compressed_runs is not None:
assert self._data_available_event is not None
angus-langchain marked this conversation as resolved.
Show resolved Hide resolved
serialized_op = serialize_run_dict("post", run_create)
multipart_form = (
serialized_run_operation_to_multipart_parts_and_context(
Expand Down Expand Up @@ -1979,6 +1981,7 @@ def update_run(
)
)
with self.compressed_runs.lock:
assert self._data_available_event is not None
compress_multipart_parts_and_context(
multipart_form,
self.compressed_runs,
Expand Down Expand Up @@ -2014,6 +2017,8 @@ def flush_compressed_runs(self, attempts: int = 3) -> None:
if self.compressed_runs is None:
return

assert self._futures is not None

# Attempt to drain and send any remaining data
from langsmith._internal._background_thread import (
HTTP_REQUEST_THREAD_POOL,
Expand Down
Loading