From b76e662caa6a1b0f9402b9c3c741c51d16c12581 Mon Sep 17 00:00:00 2001 From: Angus Jelinek Date: Thu, 12 Dec 2024 16:00:29 -0800 Subject: [PATCH] lint --- .../langsmith/_internal/_background_thread.py | 15 +++++++------ python/langsmith/client.py | 21 +++++++++---------- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/python/langsmith/_internal/_background_thread.py b/python/langsmith/_internal/_background_thread.py index 31513272a..4964e4258 100644 --- a/python/langsmith/_internal/_background_thread.py +++ b/python/langsmith/_internal/_background_thread.py @@ -1,15 +1,14 @@ from __future__ import annotations +import concurrent.futures import functools import io import logging -import os import sys import threading import time import weakref from multiprocessing import cpu_count -import concurrent.futures from queue import Empty, Queue from typing import ( TYPE_CHECKING, @@ -38,7 +37,10 @@ logger = logging.getLogger("langsmith.client") -HTTP_REQUEST_THREAD_POOL = concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count()) +HTTP_REQUEST_THREAD_POOL = concurrent.futures.ThreadPoolExecutor( + max_workers=cpu_count() +) + @functools.total_ordering class TracingQueueItem: @@ -261,10 +263,12 @@ def keep_thread_active() -> bool: if hasattr(sys, "getrefcount"): # check if client refs count indicates we're the only remaining # reference to the client - + # Count active threads thread_pool = HTTP_REQUEST_THREAD_POOL._threads - active_count = sum(1 for thread in thread_pool if thread is not None and thread.is_alive()) + active_count = sum( + 1 for thread in thread_pool if thread is not None and thread.is_alive() + ) return sys.getrefcount(client) > num_known_refs + active_count else: @@ -305,7 +309,6 @@ def keep_thread_active() -> bool: except RuntimeError: client._send_compressed_multipart_req(final_data_stream) - except Exception: logger.error("Error in final cleanup", exc_info=True) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 36b4c45da..e09c49b6d 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -1824,8 +1824,7 @@ def update_run( if attachments: data["attachments"] = attachments use_multipart = ( - (self.tracing_queue is not None - or self.compressed_runs_buffer is not None) + (self.tracing_queue is not None or self.compressed_runs_buffer is not None) # batch ingest requires trace_id and dotted_order to be set and data["trace_id"] is not None and data["dotted_order"] is not None @@ -1887,16 +1886,16 @@ def _update_run(self, run_update: dict) -> None: }, ) - def flush_compressed_runs(self, attempts: int = 3) -> None: - """ - Forcefully flush the currently buffered compressed runs. - """ + """Force flush the currently buffered compressed runs.""" if not self.compress_traces or self.compressed_runs_buffer is None: return # Attempt to drain and send any remaining data - from langsmith._internal._background_thread import _tracing_thread_drain_compressed_buffer, HTTP_REQUEST_THREAD_POOL + from langsmith._internal._background_thread import ( + HTTP_REQUEST_THREAD_POOL, + _tracing_thread_drain_compressed_buffer, + ) final_data_stream = _tracing_thread_drain_compressed_buffer( self, size_limit=1, size_limit_bytes=1 @@ -1909,7 +1908,9 @@ def flush_compressed_runs(self, attempts: int = 3) -> None: future = None try: future = HTTP_REQUEST_THREAD_POOL.submit( - self._send_compressed_multipart_req, final_data_stream, attempts=attempts + self._send_compressed_multipart_req, + final_data_stream, + attempts=attempts, ) except RuntimeError: # In case the ThreadPoolExecutor is already shutdown @@ -1920,9 +1921,7 @@ def flush_compressed_runs(self, attempts: int = 3) -> None: cf.wait([future]) def flush(self) -> None: - """ - A convenience method to flush either queue or compressed buffer, depending on mode. - """ + """Flush either queue or compressed buffer, depending on mode.""" if self.compress_traces and self.compressed_runs_buffer is not None: self.flush_compressed_runs() elif self.tracing_queue is not None: