From 25f4f19aa3ad5ebea11da8c95e1140a6ca181bb8 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Thu, 12 Dec 2024 15:41:00 -0800 Subject: [PATCH] fix thread garbage collection --- python/langsmith/_internal/_background_thread.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index 6b8bbdcd7..31513272a 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -247,6 +247,7 @@ def tracing_control_thread_func_compress_parallel( size_limit: int = batch_ingest_config["size_limit"] size_limit_bytes = batch_ingest_config.get("size_limit_bytes", 20_971_520) assert size_limit_bytes is not None + num_known_refs = 3 def keep_thread_active() -> bool: # if `client.cleanup()` was called, stop thread @@ -257,7 +258,19 @@ def keep_thread_active() -> bool: if not threading.main_thread().is_alive(): # main thread is dead. should not be active return False - return True + if hasattr(sys, "getrefcount"): + # check if client refs count indicates we're the only remaining + # reference to the client + + # Count active threads + thread_pool = HTTP_REQUEST_THREAD_POOL._threads + active_count = sum(1 for thread in thread_pool if thread is not None and thread.is_alive()) + + return sys.getrefcount(client) > num_known_refs + active_count + else: + # in PyPy, there is no sys.getrefcount attribute + # for now, keep thread alive + return True while keep_thread_active(): try: @@ -269,7 +282,6 @@ def keep_thread_active() -> bool: HTTP_REQUEST_THREAD_POOL.submit( client._send_compressed_multipart_req, data_stream ) - print("submitted request") except RuntimeError: client._send_compressed_multipart_req(data_stream) else: