Skip to content

Commit

Permalink
fix futures waiting
Browse files Browse the repository at this point in the history
  • Loading branch information
angus-langchain committed Dec 19, 2024
1 parent 7b6c201 commit 35d46ed
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
3 changes: 2 additions & 1 deletion python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,10 @@ def keep_thread_active() -> bool:

if data_stream is not None:
try:
HTTP_REQUEST_THREAD_POOL.submit(
future = HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req, data_stream
)
client._futures.add(future)
except RuntimeError:
client._send_compressed_multipart_req(data_stream)

Expand Down
9 changes: 7 additions & 2 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ class Client:
"_buffer_lock",
"compressed_runs_buffer",
"_data_available_event",
"_futures",
]

def __init__(
Expand Down Expand Up @@ -506,6 +507,7 @@ def __init__(
self.session = session_
self.compress_traces = ls_utils.get_env_var("USE_RUN_COMPRESSION")
if self.compress_traces:
self._futures: set[cf.Future] = set()
self.boundary = BOUNDARY
self.compressed_runs_buffer: Optional[io.BytesIO] = io.BytesIO()
self.compressor_writer: zstd.ZstdCompressionWriter = zstd.ZstdCompressor(
Expand Down Expand Up @@ -1916,13 +1918,16 @@ def flush_compressed_runs(self, attempts: int = 3) -> None:
final_data_stream,
attempts=attempts,
)
self._futures.add(future)
except RuntimeError:
# In case the ThreadPoolExecutor is already shutdown
self._send_compressed_multipart_req(final_data_stream, attempts=attempts)

# If we got a future, wait for it to complete
if future is not None:
cf.wait([future])
if self._futures:
done, _ = cf.wait(self._futures, return_when=cf.ALL_COMPLETED)
# Remove completed futures
self._futures.difference_update(done)

def flush(self) -> None:
"""Flush either queue or compressed buffer, depending on mode."""
Expand Down

0 comments on commit 35d46ed

Please sign in to comment.