Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): Enable zstd compression by default #1384

Merged
merged 25 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9087b2e
feat(python): Enable compression by default
angus-langchain Jan 6, 2025
bcb5ffd
update deps
angus-langchain Jan 6, 2025
ec73127
mypy
angus-langchain Jan 6, 2025
3fb3d5d
feat(python): Enable compression only for multipart
angus-langchain Jan 7, 2025
95eccf8
feat(python): Enable compression for multipart only
angus-langchain Jan 7, 2025
eae4ccf
feat(python): fix multipart logic
angus-langchain Jan 7, 2025
3fe4a29
Merge branch 'main' into angus/enable-compression
angus-langchain Jan 7, 2025
c242624
move compressed runs to background thread
angus-langchain Jan 7, 2025
4beb54e
feat(python):check for instance flag
angus-langchain Jan 8, 2025
b4cb53e
feat(python): add instance flag check
angus-langchain Jan 8, 2025
57109f0
fix(python): remove print
angus-langchain Jan 8, 2025
3440916
feat(python):Add asserts for mypy
angus-langchain Jan 8, 2025
f0ec00c
feat(python): Periodically flush buffer
angus-langchain Jan 8, 2025
02b17ca
feat(python): Revert back to regular tracing if compression not enabled
angus-langchain Jan 8, 2025
46eea0f
keep tracing thread alive
angus-langchain Jan 8, 2025
6d3c9f5
format
angus-langchain Jan 8, 2025
61c74bb
Add compression unit test
angus-langchain Jan 9, 2025
d67d1c9
format tests
angus-langchain Jan 10, 2025
8c4b3c6
address comments
angus-langchain Jan 13, 2025
23080d3
Add unit tests for disabled compression in instance flags and env vars
angus-langchain Jan 13, 2025
3560088
lint
angus-langchain Jan 13, 2025
1eaa9d0
add 1 to is truish
angus-langchain Jan 13, 2025
36232c3
fix test - reset cache
angus-langchain Jan 13, 2025
20034ae
remove cmt
angus-langchain Jan 13, 2025
8b9e4df
remove return
angus-langchain Jan 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 81 additions & 19 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

Check notice on line 1 in python/langsmith/_internal/_background_thread.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... create_5_000_run_trees: Mean +- std dev: 660 ms +- 49 ms ........... create_10_000_run_trees: Mean +- std dev: 1.31 sec +- 0.08 sec ........... create_20_000_run_trees: Mean +- std dev: 1.31 sec +- 0.08 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 712 us +- 13 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.1 ms +- 0.2 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 105 ms +- 3 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.4 ms +- 0.2 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (15.3 ms) is 22% of the mean (69.6 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 69.6 ms +- 15.3 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 195 ms +- 4 ms

Check notice on line 1 in python/langsmith/_internal/_background_thread.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 216 ms | 195 ms: 1.11x faster | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.5 ms | 25.4 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.0 ms | 25.1 ms: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.30 sec | 1.31 sec: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 708 us | 712 us: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 657 ms | 660 ms: 1.00x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 105 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.29 sec | 1.31 sec: 1.02x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 66.6 ms | 69.6 ms: 1.04x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.00x faster | +-----------------------------------------------+----------+------------------------+

import concurrent.futures as cf
import functools
Expand All @@ -6,6 +6,7 @@
import logging
import sys
import threading
import time
import weakref
from multiprocessing import cpu_count
from queue import Empty, Queue
Expand All @@ -20,6 +21,7 @@

from langsmith import schemas as ls_schemas
from langsmith import utils as ls_utils
from langsmith._internal._compressed_runs import CompressedRuns
from langsmith._internal._constants import (
_AUTO_SCALE_DOWN_NEMPTY_TRIGGER,
_AUTO_SCALE_UP_NTHREADS_LIMIT,
Expand Down Expand Up @@ -100,7 +102,8 @@
def _tracing_thread_drain_compressed_buffer(
client: Client, size_limit: int = 100, size_limit_bytes: int | None = 20_971_520
) -> Tuple[Optional[io.BytesIO], Optional[Tuple[int, int]]]:
assert client.compressed_runs is not None
if client.compressed_runs is None:
return None, None
with client.compressed_runs.lock:
client.compressed_runs.compressor_writer.flush()
current_size = client.compressed_runs.buffer.tell()
Expand Down Expand Up @@ -214,6 +217,24 @@
scale_up_qsize_trigger: int = batch_ingest_config["scale_up_qsize_trigger"]
use_multipart = batch_ingest_config.get("use_multipart_endpoint", False)

disable_compression = ls_utils.get_env_var("DISABLE_RUN_COMPRESSION")
if not ls_utils.is_truish(disable_compression) and use_multipart:
if not (client.info.instance_flags or {}).get(
"zstd_compression_enabled", False
):
logger.warning(
"Run compression is not enabled. Please update to the latest "
"version of LangSmith. Falling back to regular multipart ingestion."
)
else:
client._futures = set()
client.compressed_runs = CompressedRuns()
client._data_available_event = threading.Event()
threading.Thread(
target=tracing_control_thread_func_compress_parallel,
args=(weakref.ref(client),),
).start()

sub_threads: List[threading.Thread] = []
# 1 for this func, 1 for getrefcount, 1 for _get_data_type_cached
num_known_refs = 3
Expand Down Expand Up @@ -256,6 +277,7 @@
_tracing_thread_handle_batch(
client, tracing_queue, next_batch, use_multipart
)

# drain the queue on exit
while next_batch := _tracing_thread_drain_queue(
tracing_queue, limit=size_limit, block=False
Expand All @@ -264,12 +286,20 @@


def tracing_control_thread_func_compress_parallel(
client_ref: weakref.ref[Client],
client_ref: weakref.ref[Client], flush_interval: float = 0.5
) -> None:
client = client_ref()
if client is None:
return

if (
client.compressed_runs is None
or client._data_available_event is None
or client._futures is None
):
logger.error("Required compression attributes not initialized")
return

batch_ingest_config = _ensure_ingest_config(client.info)
size_limit: int = batch_ingest_config["size_limit"]
size_limit_bytes = batch_ingest_config.get("size_limit_bytes", 20_971_520)
Expand Down Expand Up @@ -300,35 +330,67 @@
# for now, keep thread alive
return True

last_flush_time = time.monotonic()

while True:
triggered = client._data_available_event.wait(timeout=0.05)
if not keep_thread_active():
break
if not triggered:
continue
client._data_available_event.clear()

data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)
# If data arrived, clear the event and attempt a drain
if triggered:
client._data_available_event.clear()

if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
data_stream, compressed_runs_info = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)
# If we have data, submit the send request
if data_stream is not None:
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(
data_stream,
compressed_runs_info,
)
last_flush_time = time.monotonic()

else:
if (time.monotonic() - last_flush_time) >= flush_interval:
data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
)
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(data_stream, compressed_runs_info)
if data_stream is not None:
try:
cf.wait(
[
HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req,
data_stream,
compressed_runs_info,
)
]
)
except RuntimeError:
client._send_compressed_multipart_req(
data_stream,
compressed_runs_info,
)
last_flush_time = time.monotonic()

# Drain the buffer on exit
# Drain the buffer on exit (final flush)
try:
final_data_stream, compressed_runs_info = (
_tracing_thread_drain_compressed_buffer(
client, size_limit=1, size_limit_bytes=1
) # Force final drain
)
)
if final_data_stream is not None:
try:
Expand Down
17 changes: 1 addition & 16 deletions python/langsmith/_internal/_compressed_runs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import io
import threading

try:
from zstandard import ZstdCompressor # type: ignore[import]

HAVE_ZSTD = True
except ImportError:
HAVE_ZSTD = False
from zstandard import ZstdCompressor # type: ignore[import]

from langsmith import utils as ls_utils

Expand All @@ -20,11 +15,6 @@ def __init__(self):
self.lock = threading.Lock()
self.uncompressed_size = 0

if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
"Install with 'pip install langsmith[compression]'"
)
self.compressor_writer = ZstdCompressor(
level=compression_level, threads=-1
).stream_writer(self.buffer, closefd=False)
Expand All @@ -34,11 +24,6 @@ def reset(self):
self.run_count = 0
self.uncompressed_size = 0

if not HAVE_ZSTD:
raise ImportError(
"zstandard package required for compression. "
"Install with 'pip install langsmith[compression]'"
)
self.compressor_writer = ZstdCompressor(
level=compression_level, threads=-1
).stream_writer(self.buffer, closefd=False)
38 changes: 18 additions & 20 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@
from langsmith._internal._background_thread import (
tracing_control_thread_func as _tracing_control_thread_func,
)
from langsmith._internal._background_thread import (
tracing_control_thread_func_compress_parallel as _tracing_control_thread_func_compress_parallel,
)
from langsmith._internal._beta_decorator import warn_beta
from langsmith._internal._compressed_runs import CompressedRuns
from langsmith._internal._constants import (
Expand Down Expand Up @@ -476,31 +473,19 @@ def __init__(
# Create a session and register a finalizer to close it
session_ = session if session else requests.Session()
self.session = session_
if ls_utils.get_env_var("USE_RUN_COMPRESSION"):
self._futures: set[cf.Future] = set()
self.compressed_runs: Optional[CompressedRuns] = CompressedRuns()
self._data_available_event = threading.Event()
else:
self.compressed_runs = None

self._info = (
info
if info is None or isinstance(info, ls_schemas.LangSmithInfo)
else ls_schemas.LangSmithInfo(**info)
)
weakref.finalize(self, close_session, self.session)
atexit.register(close_session, session_)
self.compressed_runs: Optional[CompressedRuns] = None
self._data_available_event: Optional[threading.Event] = None
self._futures: Optional[set[cf.Future]] = None
# Initialize auto batching
if auto_batch_tracing and self.compressed_runs is not None:
self.tracing_queue: Optional[PriorityQueue] = None
threading.Thread(
target=_tracing_control_thread_func_compress_parallel,
# arg must be a weakref to self to avoid the Thread object
# preventing garbage collection of the Client object
args=(weakref.ref(self),),
).start()
elif auto_batch_tracing:
self.tracing_queue = PriorityQueue()
if auto_batch_tracing:
self.tracing_queue: Optional[PriorityQueue] = PriorityQueue()

threading.Thread(
target=_tracing_control_thread_func,
Expand Down Expand Up @@ -1293,6 +1278,10 @@ def create_run(
if self._pyo3_client is not None:
self._pyo3_client.create_run(run_create)
elif self.compressed_runs is not None:
if self._data_available_event is None:
raise ValueError(
"Run compression is enabled but threading event is not configured"
)
serialized_op = serialize_run_dict("post", run_create)
multipart_form = (
serialized_run_operation_to_multipart_parts_and_context(
Expand Down Expand Up @@ -1995,6 +1984,10 @@ def update_run(
)
)
with self.compressed_runs.lock:
if self._data_available_event is None:
raise ValueError(
"Run compression is enabled but threading event is not configured"
)
compress_multipart_parts_and_context(
multipart_form,
self.compressed_runs,
Expand Down Expand Up @@ -2030,6 +2023,11 @@ def flush_compressed_runs(self, attempts: int = 3) -> None:
if self.compressed_runs is None:
return

if self._futures is None:
raise ValueError(
"Run compression is enabled but request pool futures is not set"
)

# Attempt to drain and send any remaining data
from langsmith._internal._background_thread import (
HTTP_REQUEST_THREAD_POOL,
Expand Down
12 changes: 12 additions & 0 deletions python/langsmith/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,3 +795,15 @@ def _get_function_name(fn: Callable, depth: int = 0) -> str:
return _get_function_name(fn.__call__, depth + 1)

return str(fn)


def is_truish(val: Any) -> bool:
angus-langchain marked this conversation as resolved.
Show resolved Hide resolved
"""Check if the value is truish.

Args:
val (Any): The value to check.

Returns:
bool: True if the value is truish, False otherwise.
"""
return val is True or val == "true" or val == "True" or val == "TRUE" or val == "1"
9 changes: 4 additions & 5 deletions python/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ requests-toolbelt = "^1.0.0"
# Enabled via `langsmith_pyo3` extra: `pip install langsmith[langsmith_pyo3]`.
langsmith-pyo3 = { version = "^0.1.0rc2", optional = true }
# Enabled via `compression` extra: `pip install langsmith[compression]`.
zstandard = { version = "^0.23.0", optional = true }

[tool.poetry.group.dev.dependencies]
pytest = "^7.3.1"
Expand Down Expand Up @@ -66,6 +65,7 @@ pytest-socket = "^0.7.0"
pyperf = "^2.7.0"
py-spy = "^0.3.14"
multipart = "^1.0.0"
zstandard = "^0.23.0"

[tool.poetry.group.lint.dependencies]
openai = "^1.10"
Expand All @@ -77,7 +77,6 @@ pytest-socket = "^0.7.0"
[tool.poetry.extras]
vcr = ["vcrpy"]
langsmith_pyo3 = ["langsmith-pyo3"]
compression = ["zstandard"]

[build-system]
requires = ["poetry-core"]
Expand Down
Loading
Loading