Skip to content

Commit

Permalink
signal bg threads data is available instead of sleeping
Browse files Browse the repository at this point in the history
  • Loading branch information
angus-langchain committed Dec 13, 2024
1 parent efa4bd6 commit 0e06bde
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
37 changes: 19 additions & 18 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import logging
import sys
import threading
import time
import weakref
from multiprocessing import cpu_count
from queue import Empty, Queue
Expand Down Expand Up @@ -276,23 +275,25 @@ def keep_thread_active() -> bool:
# for now, keep thread alive
return True

while keep_thread_active():
try:
data_stream = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)
if data_stream is not None:
try:
HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req, data_stream
)
except RuntimeError:
client._send_compressed_multipart_req(data_stream)
else:
time.sleep(0.05)
except Exception:
logger.error("Error in tracing compression thread", exc_info=True)
time.sleep(0.1) # Wait before retrying on error
while True:
triggered = client._data_available_event.wait(timeout=0.05)
if not keep_thread_active():
break
if not triggered:
continue
client._data_available_event.clear()

data_stream = _tracing_thread_drain_compressed_buffer(
client, size_limit, size_limit_bytes
)

if data_stream is not None:
try:
HTTP_REQUEST_THREAD_POOL.submit(
client._send_compressed_multipart_req, data_stream
)
except RuntimeError:
client._send_compressed_multipart_req(data_stream)

# Drain the buffer on exit
try:
Expand Down
4 changes: 4 additions & 0 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ class Client:
"_run_count",
"_buffer_lock",
"compressed_runs_buffer",
"_data_available_event",
]

def __init__(
Expand Down Expand Up @@ -511,6 +512,7 @@ def __init__(
level=3, threads=-1
).stream_writer(self.compressed_runs_buffer, closefd=False)
self._buffer_lock: threading.Lock = threading.Lock()
self._data_available_event = threading.Event()
self._run_count: int = 0
else:
self.compressed_runs_buffer = None
Expand Down Expand Up @@ -1334,6 +1336,7 @@ def create_run(
multipart_form, self.compressor_writer, self.boundary
)
self._run_count += 1
self._data_available_event.set()
elif self.tracing_queue is not None:
serialized_op = serialize_run_dict("post", run_create)
self.tracing_queue.put(
Expand Down Expand Up @@ -1863,6 +1866,7 @@ def update_run(
multipart_form, self.compressor_writer, self.boundary
)
self._run_count += 1
self._data_available_event.set()
elif self.tracing_queue is not None:
self.tracing_queue.put(
TracingQueueItem(data["dotted_order"], serialized_op)
Expand Down

0 comments on commit 0e06bde

Please sign in to comment.