Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Use Blockbuster to detect blocking calls in asyncio during tests #29043

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

cbornet
Copy link
Collaborator

@cbornet cbornet commented Jan 6, 2025

This PR uses the blockbuster library in langchain-core to detect blocking calls made in the asyncio event loop during unit tests.
Avoiding blocking calls is hard as these can be deeply buried in the code or made in 3rd party libraries.
Blockbuster makes it easier to detect them by raising an exception when a call is made to a known blocking function (eg: time.sleep).

Adding blockbuster allowed to find a blocking call in aconfig_with_context (it ends up calling get_function_nonlocals which loads function code).

Dependencies:

  • blockbuster (test)

Twitter handle: cbornet_

Copy link

vercel bot commented Jan 6, 2025

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
langchain ⬜️ Ignored (Inspect) Visit Preview Jan 13, 2025 3:22pm

@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Jan 6, 2025
@cbornet cbornet force-pushed the blockbuster branch 2 times, most recently from 0983c3d to 7ce4b18 Compare January 6, 2025 15:10

with tempfile.NamedTemporaryFile(delete=True, suffix=".jpg") as temp_file:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's useless to use a real file here.

"""Test invoking nested runnable lambda."""
blockbuster.deactivate()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code makes a sync call from async on purpose...

Copy link
Collaborator

@eyurtsev eyurtsev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Should we enable/disable for unit tests?

I like that it forces us to separate async tests from sync tests and avoid being lazy, but at the same time there's some changes that aren't technically necessary and seem to unnecessarily complicate the testing code (e.g., updating a file open to a non blocking request)?

@@ -121,7 +121,7 @@ def _config_with_context(
return patch_config(config, configurable=context_funcs)


def aconfig_with_context(
async def aconfig_with_context(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically breaking change, but probably okay looks a lot like a private function to me.

Would you mind adding a comment about why this needs to be async (i.e., is inspect.get_source is making os calls?)

Copy link
Collaborator Author

@cbornet cbornet Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, inspect.get_source makes os.stat calls to check if a source file has been updated from the linecache and FS read calls to get the code if the linecache needs to be updated (done at least at the first access).
Note that an LRU cache was added probably because these os calls have an impact on perf ?#28131
Thinking about it, it may be cleaner to have a aconfig_specs in Runnable that defaults to calling config_specs (not in a thread) and that we can override to use a thread for RunnableLambda. WDYT ?

@@ -90,9 +91,11 @@ async def test_inmemory_dump_load(tmp_path: Path) -> None:
output = await store.asimilarity_search("foo", k=1)

test_file = str(tmp_path / "test.json")
store.dump(test_file)
await asyncio.to_thread(store.dump, test_file)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like a false positive for the test? it doesn't really matter whether this is using blocking or non blocking code here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, with the way blockbuster is configured here (activated before the test, deativated after), the test code itself needs to be non-blocking.
Maybe we could add aload/adump methods to InMemoryVectorStore using aiofile/aiofiles?

@@ -298,17 +299,17 @@ def parent(a: int) -> int:
# Now run the chain and check the resulting posts
cb = [tracer]
if method == "invoke":
res: Any = parent.invoke(1, {"callbacks": cb}) # type: ignore
res: Any = await asyncio.to_thread(parent.invoke, 1, {"callbacks": cb}) # type: ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any benefit to doing this? feel like it's complicating the test code?

Copy link
Collaborator Author

@cbornet cbornet Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this part to cleanly separate sync and async tests: see c2162c2
With this the asyncio.to_thread calls are not needed.

@dosubot dosubot bot added size:XL This PR changes 500-999 lines, ignoring generated files. and removed size:L This PR changes 100-499 lines, ignoring generated files. labels Jan 9, 2025
@cbornet cbornet force-pushed the blockbuster branch 5 times, most recently from 188abdf to 4598e0d Compare January 10, 2025 17:27
Copy link
Collaborator

@eyurtsev eyurtsev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK makes sense -- I'll finish reviewing the test changes on Monday and will merge.


If there were a way to configure block buster to fail based on the stack trace (e.g., we mostly care about async calls to langchain APIs) , it'll probably make it more valuable (or at least less effort to adopt widely)

test_foo.py

async def test1():
  ---
   with open(..) as f: # blocking call but we don't care in 99% of the time
    ...
   ---- 
   # Blocking call from calling sync method in async test.. we probably care about it
   # but there might be some exceptions
   runnable.invoke()  

  # blocking call from an async api
  # we care about this 100% of the time
  await  some_langchain_thing() # results in a blocking call

@cbornet cbornet force-pushed the blockbuster branch 2 times, most recently from 3a082ce to e5c91e0 Compare January 13, 2025 13:12
@cbornet cbornet force-pushed the blockbuster branch 3 times, most recently from 48dd730 to 3c8ab33 Compare January 13, 2025 15:11
@cbornet
Copy link
Collaborator Author

cbornet commented Jan 13, 2025

If there were a way to configure block buster to fail based on the stack trace (e.g., we mostly care about async calls to langchain APIs) , it'll probably make it more valuable (or at least less effort to adopt widely)

I can have a look at that but this means inspecting the full stack to search for langchain_core files. This is a slow operation and I don't know if it will be performant enough.

(
bb.functions[func]
.can_block_in("langchain_core/_api/internal.py", "is_caller_internal")
.can_block_in("langchain_core/runnables/base.py", "__repr__")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunnableLambda's __repr__ calls get_lambda_source which is blocking. It should probably be cached.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XL This PR changes 500-999 lines, ignoring generated files.
Projects
Status: In review
Development

Successfully merging this pull request may close these issues.

2 participants