Skip to content

Commit

Permalink
noreg
Browse files Browse the repository at this point in the history
  • Loading branch information
hinthornw committed Mar 5, 2024
1 parent 18b6ca2 commit 170df57
Showing 1 changed file with 20 additions and 34 deletions.
54 changes: 20 additions & 34 deletions python/langsmith/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,14 +418,17 @@ def __init__(
self.session = session if session else requests.Session()
weakref.finalize(self, close_session, self.session)
# Initialize auto batching
self.tracing_queue: Optional[PriorityQueue] = PriorityQueue()

threading.Thread(
target=_tracing_control_thread_func,
# arg must be a weakref to self to avoid the Thread object
# preventing garbage collection of the Client object
args=(weakref.ref(self), auto_batch_tracing),
).start()
if auto_batch_tracing:
self.tracing_queue: Optional[PriorityQueue] = PriorityQueue()

threading.Thread(
target=_tracing_control_thread_func,
# arg must be a weakref to self to avoid the Thread object
# preventing garbage collection of the Client object
args=(weakref.ref(self),),
).start()
else:
self.tracing_queue = None

# Mount the HTTPAdapter with the retry configuration
adapter = requests_adapters.HTTPAdapter(max_retries=self.retry_config)
Expand Down Expand Up @@ -644,7 +647,7 @@ def request_with_retries(
recommendation = (
"Please confirm your LANGCHAIN_ENDPOINT"
if self.api_url != "https://api.smith.langchain.com"
else "Please confirm your internet connection"
else "Please confirm your internet connection."
)
raise ls_utils.LangSmithConnectionError(
f"Connection error caused failure to {request_method} {url}"
Expand Down Expand Up @@ -3879,18 +3882,11 @@ def _tracing_thread_handle_batch(
client: Client,
tracing_queue: Queue,
batch: List[TracingQueueItem],
auto_batch_tracing: bool = True,
) -> None:
create = [it.item for it in batch if it.action == "create"]
update = [it.item for it in batch if it.action == "update"]
try:
if auto_batch_tracing:
client.batch_ingest_runs(create=create, update=update, pre_sampled=True)
else:
for run_create in create:
client._create_run(run_create)
for run_update in update:
client._update_run(run_update)
client.batch_ingest_runs(create=create, update=update, pre_sampled=True)
except Exception:
logger.error("Error in tracing queue", exc_info=True)
# exceptions are logged elsewhere, but we need to make sure the
Expand Down Expand Up @@ -3926,9 +3922,7 @@ def _ensure_ingest_config(
return default_config


def _tracing_control_thread_func(
client_ref: weakref.ref[Client], auto_batch_tracing: bool
) -> None:
def _tracing_control_thread_func(client_ref: weakref.ref[Client]) -> None:
client = client_ref()
if client is None:
return
Expand Down Expand Up @@ -3959,25 +3953,21 @@ def _tracing_control_thread_func(
):
new_thread = threading.Thread(
target=_tracing_sub_thread_func,
args=(weakref.ref(client), auto_batch_tracing),
args=(weakref.ref(client),),
)
sub_threads.append(new_thread)
new_thread.start()
if next_batch := _tracing_thread_drain_queue(tracing_queue, limit=size_limit):
_tracing_thread_handle_batch(
client, tracing_queue, next_batch, auto_batch_tracing=auto_batch_tracing
)
_tracing_thread_handle_batch(client, tracing_queue, next_batch)
# drain the queue on exit
while next_batch := _tracing_thread_drain_queue(
tracing_queue, limit=size_limit, block=False
):
_tracing_thread_handle_batch(
client, tracing_queue, next_batch, auto_batch_tracing=auto_batch_tracing
)
_tracing_thread_handle_batch(client, tracing_queue, next_batch)


def _tracing_sub_thread_func(
client_ref: weakref.ref[Client], auto_batch_tracing: bool
client_ref: weakref.ref[Client],
) -> None:
client = client_ref()
if client is None:
Expand All @@ -4004,16 +3994,12 @@ def _tracing_sub_thread_func(
):
if next_batch := _tracing_thread_drain_queue(tracing_queue, limit=size_limit):
seen_successive_empty_queues = 0
_tracing_thread_handle_batch(
client, tracing_queue, next_batch, auto_batch_tracing=auto_batch_tracing
)
_tracing_thread_handle_batch(client, tracing_queue, next_batch)
else:
seen_successive_empty_queues += 1

# drain the queue on exit
while next_batch := _tracing_thread_drain_queue(
tracing_queue, limit=size_limit, block=False
):
_tracing_thread_handle_batch(
client, tracing_queue, next_batch, auto_batch_tracing=auto_batch_tracing
)
_tracing_thread_handle_batch(client, tracing_queue, next_batch)

0 comments on commit 170df57

Please sign in to comment.