From 5b0cca4fffc563958d1208f0c91fabaac0327cd9 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Thu, 12 Dec 2024 15:49:40 -0800 Subject: [PATCH] add flush method --- python/langsmith/client.py | 39 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 391f2be74..14f18ae9e 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1887,6 +1887,45 @@ def _update_run(self, run_update: dict) -> None: }, ) + + def flush_compressed_runs(self, attempts: int = 3) -> None: + """ + Forcefully flush the currently buffered compressed runs. + """ + if not self.compress_traces or self.compressed_runs_buffer is None: + return + + # Attempt to drain and send any remaining data + from langsmith._internal._background_thread import _tracing_thread_drain_compressed_buffer, HTTP_REQUEST_THREAD_POOL + + final_data_stream = _tracing_thread_drain_compressed_buffer( + self, size_limit=1, size_limit_bytes=1 + ) + + if final_data_stream is not None: + # We have data to send + future = None + try: + future = HTTP_REQUEST_THREAD_POOL.submit( + self._send_compressed_multipart_req, final_data_stream, attempts=attempts + ) + except RuntimeError: + # In case the ThreadPoolExecutor is already shutdown + self._send_compressed_multipart_req(final_data_stream, attempts=attempts) + + # If we got a future, wait for it to complete + if future is not None: + cf.wait([future]) + + def flush(self) -> None: + """ + A convenience method to flush either queue or compressed buffer, depending on mode. + """ + if self.compress_traces and self.compressed_runs_buffer is not None: + self.flush_compressed_runs() + elif self.tracing_queue is not None: + self.tracing_queue.join() + def _load_child_runs(self, run: ls_schemas.Run) -> ls_schemas.Run: """Load child runs for a given run.