Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Use Blockbuster to detect blocking calls in asyncio during tests #29043

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions libs/core/langchain_core/beta/runnables/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,11 @@ def _key_from_id(id_: str) -> str:

def _config_with_context(
config: RunnableConfig,
steps: list[Runnable],
context_specs: list[tuple[ConfigurableFieldSpec, int]],
setter: Callable,
getter: Callable,
event_cls: Union[type[threading.Event], type[asyncio.Event]],
) -> RunnableConfig:
if any(k.startswith(CONTEXT_CONFIG_PREFIX) for k in config.get("configurable", {})):
return config

context_specs = [
(spec, i)
for i, step in enumerate(steps)
for spec in step.config_specs
if spec.id.startswith(CONTEXT_CONFIG_PREFIX)
]
grouped_by_key = {
key: list(group)
for key, group in groupby(
Expand Down Expand Up @@ -121,7 +112,7 @@ def _config_with_context(
return patch_config(config, configurable=context_funcs)


def aconfig_with_context(
async def aconfig_with_context(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically breaking change, but probably okay looks a lot like a private function to me.

Would you mind adding a comment about why this needs to be async (i.e., is inspect.get_source is making os calls?)

Copy link
Collaborator Author

@cbornet cbornet Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, inspect.get_source makes os.stat calls to check if a source file has been updated from the linecache and FS read calls to get the code if the linecache needs to be updated (done at least at the first access).
Note that an LRU cache was added probably because these os calls have an impact on perf ?#28131
Thinking about it, it may be cleaner to have a aconfig_specs in Runnable that defaults to calling config_specs (not in a thread) and that we can override to use a thread for RunnableLambda. WDYT ?

config: RunnableConfig,
steps: list[Runnable],
) -> RunnableConfig:
Expand All @@ -134,7 +125,18 @@ def aconfig_with_context(
Returns:
The patched runnable config.
"""
return _config_with_context(config, steps, _asetter, _agetter, asyncio.Event)
if any(k.startswith(CONTEXT_CONFIG_PREFIX) for k in config.get("configurable", {})):
return config

context_specs = [
(spec, i)
for i, step in enumerate(steps)
for spec in await asyncio.to_thread(getattr, step, "config_specs")
if spec.id.startswith(CONTEXT_CONFIG_PREFIX)
]
return _config_with_context(
config, context_specs, _asetter, _agetter, asyncio.Event
)


def config_with_context(
Expand All @@ -150,7 +152,18 @@ def config_with_context(
Returns:
The patched runnable config.
"""
return _config_with_context(config, steps, _setter, _getter, threading.Event)
if any(k.startswith(CONTEXT_CONFIG_PREFIX) for k in config.get("configurable", {})):
return config

context_specs = [
(spec, i)
for i, step in enumerate(steps)
for spec in step.config_specs
if spec.id.startswith(CONTEXT_CONFIG_PREFIX)
]
return _config_with_context(
config, context_specs, _setter, _getter, threading.Event
)


@beta()
Expand Down
6 changes: 3 additions & 3 deletions libs/core/langchain_core/runnables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3037,7 +3037,7 @@ async def ainvoke(
from langchain_core.beta.runnables.context import aconfig_with_context

# setup callbacks and context
config = aconfig_with_context(ensure_config(config), self.steps)
config = await aconfig_with_context(ensure_config(config), self.steps)
callback_manager = get_async_callback_manager_for_config(config)
# start the root run
run_manager = await callback_manager.on_chain_start(
Expand Down Expand Up @@ -3214,7 +3214,7 @@ async def abatch(

# setup callbacks and context
configs = [
aconfig_with_context(c, self.steps)
await aconfig_with_context(c, self.steps)
for c in get_config_list(config, len(inputs))
]
callback_managers = [
Expand Down Expand Up @@ -3364,7 +3364,7 @@ async def _atransform(
from langchain_core.beta.runnables.context import aconfig_with_context

steps = [self.first] + self.middle + [self.last]
config = aconfig_with_context(config, self.steps)
config = await aconfig_with_context(config, self.steps)

# stream the last steps
# transform the input stream of each step with the next
Expand Down
32 changes: 28 additions & 4 deletions libs/core/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libs/core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ grandalf = "^0.8"
responses = "^0.25.0"
pytest-socket = "^0.7.0"
pytest-xdist = "^3.6.1"
blockbuster = "~1.5.9"
[[tool.poetry.group.test.dependencies.numpy]]
version = "^1.24.0"
python = "<3.12"
Expand Down
26 changes: 25 additions & 1 deletion libs/core/tests/unit_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
"""Configuration for unit tests."""

from collections.abc import Sequence
from collections.abc import Iterator, Sequence
from importlib import util
from uuid import UUID

import pytest
from blockbuster import BlockBuster, blockbuster_ctx
from pytest import Config, Function, Parser
from pytest_mock import MockerFixture


@pytest.fixture(autouse=True)
def blockbuster() -> Iterator[BlockBuster]:
with blockbuster_ctx() as bb:
for func in ["os.stat", "os.path.abspath"]:
(
bb.functions[func]
.can_block_in("langchain_core/_api/internal.py", "is_caller_internal")
.can_block_in("langchain_core/runnables/base.py", "__repr__")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunnableLambda's __repr__ calls get_lambda_source which is blocking. It should probably be cached.

)

for func in ["os.stat", "io.TextIOWrapper.read"]:
bb.functions[func].can_block_in(
"langsmith/client.py", "_default_retry_config"
)

for bb_function in bb.functions.values():
bb_function.can_block_in(
"freezegun/api.py", "_get_cached_module_attributes"
)

yield bb


def pytest_addoption(parser: Parser) -> None:
"""Add custom command line options to pytest."""
parser.addoption(
Expand Down
7 changes: 6 additions & 1 deletion libs/core/tests/unit_tests/fake/test_fake_chat_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,12 @@ async def on_llm_new_token(
model = GenericFakeChatModel(messages=infinite_cycle)
tokens: list[str] = []
# New model
results = list(model.stream("meow", {"callbacks": [MyCustomAsyncHandler(tokens)]}))
results = [
chunk
async for chunk in model.astream(
"meow", {"callbacks": [MyCustomAsyncHandler(tokens)]}
)
]
assert results == [
_any_id_ai_message_chunk(content="hello"),
_any_id_ai_message_chunk(content=" "),
Expand Down
59 changes: 37 additions & 22 deletions libs/core/tests/unit_tests/language_models/chat_models/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from langchain_core.outputs.llm_result import LLMResult
from langchain_core.tracers import LogStreamCallbackHandler
from langchain_core.tracers.base import BaseTracer
from langchain_core.tracers.context import collect_runs
from langchain_core.tracers.event_stream import _AstreamEventsCallbackHandler
Expand Down Expand Up @@ -304,39 +305,48 @@ def _stream(


@pytest.mark.parametrize("disable_streaming", [True, False, "tool_calling"])
async def test_disable_streaming(
def test_disable_streaming(
disable_streaming: Union[bool, Literal["tool_calling"]],
) -> None:
model = StreamingModel(disable_streaming=disable_streaming)
assert model.invoke([]).content == "invoke"
assert (await model.ainvoke([])).content == "invoke"

expected = "invoke" if disable_streaming is True else "stream"
assert next(model.stream([])).content == expected
async for c in model.astream([]):
assert c.content == expected
break
assert (
model.invoke([], config={"callbacks": [LogStreamCallbackHandler()]}).content
== expected
)

expected = "invoke" if disable_streaming in ("tool_calling", True) else "stream"
assert next(model.stream([], tools=[{"type": "function"}])).content == expected
assert (
model.invoke(
[], config={"callbacks": [_AstreamEventsCallbackHandler()]}
[], config={"callbacks": [LogStreamCallbackHandler()]}, tools=[{}]
).content
== expected
)


@pytest.mark.parametrize("disable_streaming", [True, False, "tool_calling"])
async def test_disable_streaming_async(
disable_streaming: Union[bool, Literal["tool_calling"]],
) -> None:
model = StreamingModel(disable_streaming=disable_streaming)
assert (await model.ainvoke([])).content == "invoke"

expected = "invoke" if disable_streaming is True else "stream"
async for c in model.astream([]):
assert c.content == expected
break
assert (
await model.ainvoke([], config={"callbacks": [_AstreamEventsCallbackHandler()]})
).content == expected

expected = "invoke" if disable_streaming in ("tool_calling", True) else "stream"
assert next(model.stream([], tools=[{"type": "function"}])).content == expected
async for c in model.astream([], tools=[{}]):
assert c.content == expected
break
assert (
model.invoke(
[], config={"callbacks": [_AstreamEventsCallbackHandler()]}, tools=[{}]
).content
== expected
)
assert (
await model.ainvoke(
[], config={"callbacks": [_AstreamEventsCallbackHandler()]}, tools=[{}]
Expand All @@ -345,26 +355,31 @@ async def test_disable_streaming(


@pytest.mark.parametrize("disable_streaming", [True, False, "tool_calling"])
async def test_disable_streaming_no_streaming_model(
def test_disable_streaming_no_streaming_model(
disable_streaming: Union[bool, Literal["tool_calling"]],
) -> None:
model = NoStreamingModel(disable_streaming=disable_streaming)
assert model.invoke([]).content == "invoke"
assert (await model.ainvoke([])).content == "invoke"
assert next(model.stream([])).content == "invoke"
async for c in model.astream([]):
assert c.content == "invoke"
break
assert (
model.invoke(
[], config={"callbacks": [_AstreamEventsCallbackHandler()]}
).content
model.invoke([], config={"callbacks": [LogStreamCallbackHandler()]}).content
== "invoke"
)
assert next(model.stream([], tools=[{}])).content == "invoke"


@pytest.mark.parametrize("disable_streaming", [True, False, "tool_calling"])
async def test_disable_streaming_no_streaming_model_async(
disable_streaming: Union[bool, Literal["tool_calling"]],
) -> None:
model = NoStreamingModel(disable_streaming=disable_streaming)
assert (await model.ainvoke([])).content == "invoke"
async for c in model.astream([]):
assert c.content == "invoke"
break
assert (
await model.ainvoke([], config={"callbacks": [_AstreamEventsCallbackHandler()]})
).content == "invoke"
assert next(model.stream([], tools=[{}])).content == "invoke"
async for c in model.astream([], tools=[{}]):
assert c.content == "invoke"
break
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import time
from typing import Optional as Optional

import pytest
from blockbuster import BlockBuster

from langchain_core.caches import InMemoryCache
from langchain_core.language_models import GenericFakeChatModel
from langchain_core.rate_limiters import InMemoryRateLimiter


@pytest.fixture(autouse=True)
def deactivate_blockbuster(blockbuster: BlockBuster) -> None:
# Deactivate BlockBuster to not disturb the rate limiter timings
blockbuster.deactivate()


def test_rate_limit_invoke() -> None:
"""Add rate limiter."""

Expand Down
Loading
Loading