Skip to content

Commit

Permalink
Drain the Rust client when its last copy in Python is dropped.
Browse files Browse the repository at this point in the history
  • Loading branch information
obi1kenobi committed Dec 12, 2024
1 parent cef4b0e commit 94a337a
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 16 deletions.
2 changes: 1 addition & 1 deletion python/bench/tracing_client_via_pyo3.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def benchmark_run_creation(json_size, num_runs) -> None:

if client._pyo3_client:
# Wait for the queue to drain.
client._pyo3_client.drain()
del client
else:
client.tracing_queue.join()

Expand Down
2 changes: 1 addition & 1 deletion python/bench/tracing_rust_client_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def benchmark_run_creation(num_runs: int, json_size: int, samples: int = 1) -> D
client.create_run(run)

# wait for client queues to be empty
client.drain()
del client
elapsed = time.perf_counter() - start

print(f"runs complete: {elapsed:.3f}s")
Expand Down
17 changes: 11 additions & 6 deletions rust/crates/langsmith-pyo3/src/blocking_tracing_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,19 @@ impl BlockingTracingClient {
Python::allow_threads(slf.py(), || unpacked.client.submit_run_update(run.into_inner()))
.map_err(|e| into_py_err(slf.py(), e))
}

pub fn drain(slf: &Bound<'_, Self>) -> PyResult<()> {
let unpacked = slf.get();
Python::allow_threads(slf.py(), || unpacked.client.drain())
.map_err(|e| into_py_err(slf.py(), e))
}
}

fn into_py_err(py: Python<'_>, e: langsmith_tracing_client::client::TracingClientError) -> PyErr {
crate::errors::TracingClientError::new_err(format!("{e}").into_py(py))
}

impl Drop for BlockingTracingClient {
fn drop(&mut self) {
if Arc::strong_count(&self.client) == 1 {
// This is the only copy of the client in Python,
// so let it drain its in-progress requests before proceeding.
// This runs when Python runs GC on the client, such as when the application is exiting.
self.client.drain().expect("draining failed");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,7 @@ impl RunProcessor {
}

self.drain_sender.send(()).expect("drain_sender should never fail");

// Put this thread to sleep, so we know the remaining `Drain` messages
// are almost certainly answered by other worker threads.
//
// HACK: This is very hacky!
// Drain should only be used for benchmarking.
std::thread::sleep(Duration::from_secs(120));
break;
}
_ => {
buffer.push(queued_run);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ impl TracingClient {
self.sender.send(queued_run).map_err(|_| TracingClientError::QueueFull)
}

/// Complete all in-progress requests, then allow the worker threads to exit.
///
/// Convenience function for the PyO3 bindings, which cannot use [`Self::shutdown`]
/// due to its by-value `self`. This means we cannot `.join()` the threads,
/// but the client is nevertheless unusable after this call.
///
/// Sending further data after a [`Self::drain()`] call has unspecified behavior.
/// It will not cause *undefined behavior* in the programming language sense,
/// but it may e.g. cause errors, panics, or even silently fail, with no guarantees.
pub fn drain(&self) -> Result<(), TracingClientError> {
for _ in &self.handles {
self.sender.send(QueuedRun::Drain).map_err(|_| TracingClientError::QueueFull)?;
Expand Down
2 changes: 1 addition & 1 deletion rust/crates/langsmith-tracing-client/src/client/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,6 @@ pub(crate) enum QueuedRun {
Update(RunUpdateExtended),

Check warning on line 93 in rust/crates/langsmith-tracing-client/src/client/run.rs

View workflow job for this annotation

GitHub Actions / Check lint and rustfmt

Diff in /home/runner/work/langsmith-sdk/langsmith-sdk/rust/crates/langsmith-tracing-client/src/client/run.rs
#[expect(dead_code)]
RunBytes(RunEventBytes),
Drain,
Drain, // Like `Shutdown`, but explicitly sends a message confirming draining is complete.
Shutdown,
}

0 comments on commit 94a337a

Please sign in to comment.