Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft of zstd compression in the PyO3 client implementation. #1398

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Client for interacting with the LangSmith API.

Check notice on line 1 in python/langsmith/client.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

........... create_5_000_run_trees: Mean +- std dev: 638 ms +- 33 ms ........... create_10_000_run_trees: Mean +- std dev: 1.28 sec +- 0.07 sec ........... create_20_000_run_trees: Mean +- std dev: 1.34 sec +- 0.10 sec ........... dumps_class_nested_py_branch_and_leaf_200x400: Mean +- std dev: 708 us +- 11 us ........... dumps_class_nested_py_leaf_50x100: Mean +- std dev: 25.5 ms +- 0.2 ms ........... dumps_class_nested_py_leaf_100x200: Mean +- std dev: 105 ms +- 3 ms ........... dumps_dataclass_nested_50x100: Mean +- std dev: 25.6 ms +- 0.4 ms ........... WARNING: the benchmark result may be unstable * the standard deviation (18.7 ms) is 24% of the mean (76.3 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. dumps_pydantic_nested_50x100: Mean +- std dev: 76.3 ms +- 18.7 ms ........... dumps_pydanticv1_nested_50x100: Mean +- std dev: 201 ms +- 5 ms

Check notice on line 1 in python/langsmith/client.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------------+----------+------------------------+ | Benchmark | main | changes | +===============================================+==========+========================+ | dumps_pydanticv1_nested_50x100 | 218 ms | 201 ms: 1.08x faster | +-----------------------------------------------+----------+------------------------+ | create_10_000_run_trees | 1.33 sec | 1.28 sec: 1.04x faster | +-----------------------------------------------+----------+------------------------+ | create_5_000_run_trees | 647 ms | 638 ms: 1.01x faster | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_branch_and_leaf_200x400 | 710 us | 708 us: 1.00x faster | +-----------------------------------------------+----------+------------------------+ | create_20_000_run_trees | 1.33 sec | 1.34 sec: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_100x200 | 104 ms | 105 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_dataclass_nested_50x100 | 25.4 ms | 25.6 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_class_nested_py_leaf_50x100 | 25.2 ms | 25.5 ms: 1.01x slower | +-----------------------------------------------+----------+------------------------+ | dumps_pydantic_nested_50x100 | 67.3 ms | 76.3 ms: 1.13x slower | +-----------------------------------------------+----------+------------------------+ | Geometric mean | (ref) | 1.00x slower | +-----------------------------------------------+----------+------------------------+

Use the client to customize API keys / workspace ocnnections, SSl certs,
etc. for tracing.
Expand Down Expand Up @@ -556,6 +556,9 @@
batch_size = 100
batch_timeout_millis = 1000
worker_threads = 1
compression_level = int(
ls_utils.get_env_var("RUN_COMPRESSION_LEVEL", 3)
)

try:
self._pyo3_client = langsmith_pyo3.BlockingTracingClient(
Expand All @@ -565,6 +568,7 @@
batch_size,
batch_timeout_millis,
worker_threads,
compression_level,
)
except Exception as e:
logger.warning(
Expand Down
47 changes: 47 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ resolver = "2"
chrono = "0.4.38"
flate2 = "1.0.34"
futures = "0.3.31"
http = "1.2.0"
rayon = "1.10.0"
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
Expand All @@ -20,6 +21,7 @@ tokio = { version = "1", features = ["full"] }
tokio-util = "0.7.12"
ureq = "2.10.1"
uuid = { version = "1.11.0", features = ["v4"] }
zstd = { version = "0.13.2", features = ["zstdmt"] }

# Use rustls instead of OpenSSL, because OpenSSL is a nightmare when compiling across platforms.
# OpenSSL is a default feature, so we have to disable all default features, then re-add
Expand Down
2 changes: 2 additions & 0 deletions rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl BlockingTracingClient {
batch_size: usize,
batch_timeout_millis: u64,
worker_threads: usize,
compression_level: i32,
) -> PyResult<Self> {
let config = langsmith_tracing_client::client::blocking::ClientConfig {
endpoint,
Expand All @@ -39,6 +40,7 @@ impl BlockingTracingClient {

headers: None, // TODO: support custom headers
num_worker_threads: worker_threads,
compression_level,
};

let client = RustTracingClient::new(config)
Expand Down
2 changes: 2 additions & 0 deletions rust/crates/langsmith-tracing-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ futures = { workspace = true }
rayon = { workspace = true }
ureq = { workspace = true }
flate2 = { workspace = true }
zstd = { workspace = true }
http = { workspace = true }

[dev-dependencies]
multer = "3.1.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{mpsc, Arc, Mutex};
use std::thread::available_parallelism;
use std::time::{Duration, Instant};

use rayon::iter::{IntoParallelIterator, ParallelIterator};
Expand Down Expand Up @@ -266,14 +267,34 @@ impl RunProcessor {
for (part_name, part) in json_parts.into_iter().chain(attachment_parts) {
form = form.part(part_name, part);
}
let content_type = format!("multipart/form-data; boundary={}", form.boundary());

let compressed_data = {
let mut buffer = Vec::with_capacity(4096);
let mut encoder = zstd::Encoder::new(&mut buffer, self.config.compression_level)
.and_then(|mut encoder| {
encoder.multithread(
available_parallelism().map(|p| p.get() as u32).unwrap_or_default(),
)?;
Ok(encoder)
})
.map_err(|e| TracingClientError::IoError(format!("{e}")))?;
std::io::copy(&mut form.reader(), &mut encoder)
.map_err(|e| TracingClientError::IoError(format!("{e}")))?;
encoder.finish().map_err(|e| TracingClientError::IoError(format!("{e}")))?;

buffer
};

// send the multipart POST request
let start_send_batch = Instant::now();
let response = self
.http_client
.post(format!("{}/runs/multipart", self.config.endpoint))
.multipart(form)
.headers(self.config.headers.as_ref().cloned().unwrap_or_default())
.header(http::header::CONTENT_TYPE, content_type)
.header(http::header::CONTENT_ENCODING, "zstd")
.body(compressed_data)
.send()?;
// println!("Sending batch took {:?}", start_send_batch.elapsed());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct ClientConfig {
pub batch_timeout: Duration,
pub headers: Option<HeaderMap>,
pub num_worker_threads: usize,
pub compression_level: i32,
}

pub struct TracingClient {
Expand Down
Loading