Skip to content

Commit

Permalink
black reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
angus-langchain committed Dec 12, 2024
1 parent 35e9843 commit 59b5f27
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions python/langsmith/_internal/_background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ def _tracing_thread_drain_compressed_buffer(
client.compressed_runs_buffer = io.BytesIO()
client.compressor_writer = zstd.ZstdCompressor(
level=3, threads=-1
).stream_writer(
client.compressed_runs_buffer, closefd=False
)
).stream_writer(client.compressed_runs_buffer, closefd=False)
client._run_count = 0

filled_buffer.seek(0)
Expand Down Expand Up @@ -234,23 +232,26 @@ def keep_thread_active() -> bool:
):
_tracing_thread_handle_batch(client, tracing_queue, next_batch, use_multipart)


def _worker_thread_func(client: Client, request_queue: Queue) -> None:
while True:
try:
data_stream = request_queue.get()

if data_stream is None:
break

client._send_compressed_multipart_req(data_stream)

except Exception:
logger.error("Error in worker thread processing request", exc_info=True)
finally:
request_queue.task_done()

def tracing_control_thread_func_compress_parallel(client_ref: weakref.ref[Client]
) -> None:

def tracing_control_thread_func_compress_parallel(
client_ref: weakref.ref[Client],
) -> None:
client = client_ref()
if client is None:
return
Expand All @@ -263,7 +264,7 @@ def tracing_control_thread_func_compress_parallel(client_ref: weakref.ref[Client
num_workers = min(4, os.cpu_count())
request_queue: Queue = Queue(maxsize=num_workers * 2)
workers = []

for _ in range(num_workers):
worker = threading.Thread(
target=_worker_thread_func,
Expand Down

0 comments on commit 59b5f27

Please sign in to comment.