Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use batched tracing in sdk #16305

Merged
merged 15 commits into from
Feb 7, 2024
54 changes: 20 additions & 34 deletions libs/core/langchain_core/tracers/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
from __future__ import annotations

import logging
import weakref
from concurrent.futures import Future, ThreadPoolExecutor, wait
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from uuid import UUID

from langsmith import Client
Expand All @@ -27,7 +26,6 @@

logger = logging.getLogger(__name__)
_LOGGED = set()
_TRACERS: weakref.WeakSet[LangChainTracer] = weakref.WeakSet()
_CLIENT: Optional[Client] = None
_EXECUTOR: Optional[ThreadPoolExecutor] = None

Expand All @@ -43,10 +41,9 @@ def log_error_once(method: str, exception: Exception) -> None:

def wait_for_all_tracers() -> None:
"""Wait for all tracers to finish."""
global _TRACERS
for tracer in list(_TRACERS):
if tracer is not None:
tracer.wait_for_futures()
global _CLIENT
if _CLIENT is not None and _CLIENT.tracing_queue is not None:
_CLIENT.tracing_queue.join()


def get_client() -> Client:
Expand Down Expand Up @@ -74,7 +71,6 @@ def __init__(
project_name: Optional[str] = None,
client: Optional[Client] = None,
tags: Optional[List[str]] = None,
use_threading: bool = True,
**kwargs: Any,
) -> None:
"""Initialize the LangChain tracer."""
Expand All @@ -84,12 +80,8 @@ def __init__(
)
self.project_name = project_name or ls_utils.get_tracer_project()
self.client = client or get_client()
self._futures: weakref.WeakSet[Future] = weakref.WeakSet()
self.tags = tags or []
self.executor = _get_executor() if use_threading else None
self.latest_run: Optional[Run] = None
global _TRACERS
_TRACERS.add(self)

def on_chat_model_start(
self,
Expand Down Expand Up @@ -181,75 +173,69 @@ def _update_run_single(self, run: Run) -> None:
log_error_once("patch", e)
raise

def _submit(self, function: Callable[[Run], None], run: Run) -> None:
"""Submit a function to the executor."""
if self.executor is None:
function(run)
else:
self._futures.add(self.executor.submit(function, run))

def _on_llm_start(self, run: Run) -> None:
"""Persist an LLM run."""
if run.parent_run_id is None:
run.reference_example_id = self.example_id
self._submit(self._persist_run_single, run)
self._persist_run_single(run)

def _on_chat_model_start(self, run: Run) -> None:
"""Persist an LLM run."""
if run.parent_run_id is None:
run.reference_example_id = self.example_id
self._submit(self._persist_run_single, run)
self._persist_run_single(run)

def _on_llm_end(self, run: Run) -> None:
"""Process the LLM Run."""
self._submit(self._update_run_single, run)
self._update_run_single(run)

def _on_llm_error(self, run: Run) -> None:
"""Process the LLM Run upon error."""
self._submit(self._update_run_single, run)
self._update_run_single(run)

def _on_chain_start(self, run: Run) -> None:
"""Process the Chain Run upon start."""
if run.parent_run_id is None:
run.reference_example_id = self.example_id
self._submit(self._persist_run_single, run)
self._persist_run_single(run)

def _on_chain_end(self, run: Run) -> None:
"""Process the Chain Run."""
self._submit(self._update_run_single, run)
self._update_run_single(run)

def _on_chain_error(self, run: Run) -> None:
"""Process the Chain Run upon error."""
self._submit(self._update_run_single, run)
self._update_run_single(run)

def _on_tool_start(self, run: Run) -> None:
"""Process the Tool Run upon start."""
if run.parent_run_id is None:
run.reference_example_id = self.example_id
self._submit(self._persist_run_single, run)
self._persist_run_single(run)

def _on_tool_end(self, run: Run) -> None:
"""Process the Tool Run."""
self._submit(self._update_run_single, run)
self._update_run_single(run)

def _on_tool_error(self, run: Run) -> None:
"""Process the Tool Run upon error."""
self._submit(self._update_run_single, run)
self._update_run_single(run)

def _on_retriever_start(self, run: Run) -> None:
"""Process the Retriever Run upon start."""
if run.parent_run_id is None:
run.reference_example_id = self.example_id
self._submit(self._persist_run_single, run)
self._persist_run_single(run)

def _on_retriever_end(self, run: Run) -> None:
"""Process the Retriever Run."""
self._submit(self._update_run_single, run)
self._update_run_single(run)

def _on_retriever_error(self, run: Run) -> None:
"""Process the Retriever Run upon error."""
self._submit(self._update_run_single, run)
self._update_run_single(run)

def wait_for_futures(self) -> None:
"""Wait for the given futures to complete."""
wait(self._futures)
if self.client is not None and self.client.tracing_queue is not None:
self.client.tracing_queue.join()
29 changes: 25 additions & 4 deletions libs/core/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion libs/core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ repository = "https://github.com/langchain-ai/langchain"
[tool.poetry.dependencies]
python = ">=3.8.1,<4.0"
pydantic = ">=1,<3"
langsmith = ">=0.0.83,<0.1"
langsmith = "^0.0.87"
tenacity = "^8.1.0"
jsonpatch = "^1.33"
anyio = ">=3,<5"
Expand Down
1 change: 0 additions & 1 deletion libs/langchain/langchain/smith/evaluation/runner_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,6 @@ def prepare(
LangChainTracer(
project_name=project.name,
client=client,
use_threading=False,
example_id=example.id,
),
EvaluatorCallbackHandler(
Expand Down
Loading