From 170df57ef4e2c612f1f4e2107416bba68ac17366 Mon Sep 17 00:00:00 2001 From: William Fu-Hinthorn <13333726+hinthornw@users.noreply.github.com> Date: Tue, 5 Mar 2024 07:46:04 -0800 Subject: [PATCH] noreg --- python/langsmith/client.py | 54 ++++++++++++++------------------------ 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/python/langsmith/client.py b/python/langsmith/client.py index 3115c06ca..4bdb3b2f2 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -418,14 +418,17 @@ def __init__( self.session = session if session else requests.Session() weakref.finalize(self, close_session, self.session) # Initialize auto batching - self.tracing_queue: Optional[PriorityQueue] = PriorityQueue() - - threading.Thread( - target=_tracing_control_thread_func, - # arg must be a weakref to self to avoid the Thread object - # preventing garbage collection of the Client object - args=(weakref.ref(self), auto_batch_tracing), - ).start() + if auto_batch_tracing: + self.tracing_queue: Optional[PriorityQueue] = PriorityQueue() + + threading.Thread( + target=_tracing_control_thread_func, + # arg must be a weakref to self to avoid the Thread object + # preventing garbage collection of the Client object + args=(weakref.ref(self),), + ).start() + else: + self.tracing_queue = None # Mount the HTTPAdapter with the retry configuration adapter = requests_adapters.HTTPAdapter(max_retries=self.retry_config) @@ -644,7 +647,7 @@ def request_with_retries( recommendation = ( "Please confirm your LANGCHAIN_ENDPOINT" if self.api_url != "https://api.smith.langchain.com" - else "Please confirm your internet connection" + else "Please confirm your internet connection." ) raise ls_utils.LangSmithConnectionError( f"Connection error caused failure to {request_method} {url}" @@ -3879,18 +3882,11 @@ def _tracing_thread_handle_batch( client: Client, tracing_queue: Queue, batch: List[TracingQueueItem], - auto_batch_tracing: bool = True, ) -> None: create = [it.item for it in batch if it.action == "create"] update = [it.item for it in batch if it.action == "update"] try: - if auto_batch_tracing: - client.batch_ingest_runs(create=create, update=update, pre_sampled=True) - else: - for run_create in create: - client._create_run(run_create) - for run_update in update: - client._update_run(run_update) + client.batch_ingest_runs(create=create, update=update, pre_sampled=True) except Exception: logger.error("Error in tracing queue", exc_info=True) # exceptions are logged elsewhere, but we need to make sure the @@ -3926,9 +3922,7 @@ def _ensure_ingest_config( return default_config -def _tracing_control_thread_func( - client_ref: weakref.ref[Client], auto_batch_tracing: bool -) -> None: +def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None: client = client_ref() if client is None: return @@ -3959,25 +3953,21 @@ def _tracing_control_thread_func( ): new_thread = threading.Thread( target=_tracing_sub_thread_func, - args=(weakref.ref(client), auto_batch_tracing), + args=(weakref.ref(client),), ) sub_threads.append(new_thread) new_thread.start() if next_batch := _tracing_thread_drain_queue(tracing_queue, limit=size_limit): - _tracing_thread_handle_batch( - client, tracing_queue, next_batch, auto_batch_tracing=auto_batch_tracing - ) + _tracing_thread_handle_batch(client, tracing_queue, next_batch) # drain the queue on exit while next_batch := _tracing_thread_drain_queue( tracing_queue, limit=size_limit, block=False ): - _tracing_thread_handle_batch( - client, tracing_queue, next_batch, auto_batch_tracing=auto_batch_tracing - ) + _tracing_thread_handle_batch(client, tracing_queue, next_batch) def _tracing_sub_thread_func( - client_ref: weakref.ref[Client], auto_batch_tracing: bool + client_ref: weakref.ref[Client], ) -> None: client = client_ref() if client is None: @@ -4004,9 +3994,7 @@ def _tracing_sub_thread_func( ): if next_batch := _tracing_thread_drain_queue(tracing_queue, limit=size_limit): seen_successive_empty_queues = 0 - _tracing_thread_handle_batch( - client, tracing_queue, next_batch, auto_batch_tracing=auto_batch_tracing - ) + _tracing_thread_handle_batch(client, tracing_queue, next_batch) else: seen_successive_empty_queues += 1 @@ -4014,6 +4002,4 @@ def _tracing_sub_thread_func( while next_batch := _tracing_thread_drain_queue( tracing_queue, limit=size_limit, block=False ): - _tracing_thread_handle_batch( - client, tracing_queue, next_batch, auto_batch_tracing=auto_batch_tracing - ) + _tracing_thread_handle_batch(client, tracing_queue, next_batch)