Skip to content

Commit

Permalink
add flush method
Browse files Browse the repository at this point in the history
  • Loading branch information
angus-langchain committed Dec 12, 2024
1 parent 25f4f19 commit 5b0cca4
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,45 @@ def _update_run(self, run_update: dict) -> None:
},
)


def flush_compressed_runs(self, attempts: int = 3) -> None:
"""
Forcefully flush the currently buffered compressed runs.
"""
if not self.compress_traces or self.compressed_runs_buffer is None:
return

# Attempt to drain and send any remaining data
from langsmith._internal._background_thread import _tracing_thread_drain_compressed_buffer, HTTP_REQUEST_THREAD_POOL

final_data_stream = _tracing_thread_drain_compressed_buffer(
self, size_limit=1, size_limit_bytes=1
)

if final_data_stream is not None:
# We have data to send
future = None
try:
future = HTTP_REQUEST_THREAD_POOL.submit(
self._send_compressed_multipart_req, final_data_stream, attempts=attempts
)
except RuntimeError:
# In case the ThreadPoolExecutor is already shutdown
self._send_compressed_multipart_req(final_data_stream, attempts=attempts)

# If we got a future, wait for it to complete
if future is not None:
cf.wait([future])

def flush(self) -> None:
"""
A convenience method to flush either queue or compressed buffer, depending on mode.
"""
if self.compress_traces and self.compressed_runs_buffer is not None:
self.flush_compressed_runs()
elif self.tracing_queue is not None:
self.tracing_queue.join()

def _load_child_runs(self, run: ls_schemas.Run) -> ls_schemas.Run:
"""Load child runs for a given run.
Expand Down

0 comments on commit 5b0cca4

Please sign in to comment.