diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index a856df557..2e8c599d3 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -114,9 +114,7 @@ def _tracing_thread_drain_compressed_buffer( client.compressed_runs_buffer = io.BytesIO() client.compressor_writer = zstd.ZstdCompressor( level=3, threads=-1 - ).stream_writer( - client.compressed_runs_buffer, closefd=False - ) + ).stream_writer(client.compressed_runs_buffer, closefd=False) client._run_count = 0 filled_buffer.seek(0) @@ -234,23 +232,26 @@ def keep_thread_active() -> bool: ): _tracing_thread_handle_batch(client, tracing_queue, next_batch, use_multipart) + def _worker_thread_func(client: Client, request_queue: Queue) -> None: while True: try: data_stream = request_queue.get() - + if data_stream is None: break - + client._send_compressed_multipart_req(data_stream) - + except Exception: logger.error("Error in worker thread processing request", exc_info=True) finally: request_queue.task_done() -def tracing_control_thread_func_compress_parallel(client_ref: weakref.ref[Client] - ) -> None: + +def tracing_control_thread_func_compress_parallel( + client_ref: weakref.ref[Client], +) -> None: client = client_ref() if client is None: return @@ -263,7 +264,7 @@ def tracing_control_thread_func_compress_parallel(client_ref: weakref.ref[Client num_workers = min(4, os.cpu_count()) request_queue: Queue = Queue(maxsize=num_workers * 2) workers = [] - + for _ in range(num_workers): worker = threading.Thread( target=_worker_thread_func,