Skip to content

Commit

Permalink
lib: imperative api: Generators use yield to publish stream_mode=cust…
Browse files Browse the repository at this point in the history
…om events
  • Loading branch information
nfcampos committed Dec 9, 2024
1 parent b37c9d8 commit e0a0958
Showing 1 changed file with 26 additions and 3 deletions.
29 changes: 26 additions & 3 deletions libs/langgraph/langgraph/func/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio

Check notice on line 1 in libs/langgraph/langgraph/func/__init__.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... fanout_to_subgraph_10x: Mean +- std dev: 63.4 ms +- 2.0 ms ......................................... fanout_to_subgraph_10x_sync: Mean +- std dev: 53.3 ms +- 0.8 ms ......................................... fanout_to_subgraph_10x_checkpoint: Mean +- std dev: 96.2 ms +- 9.4 ms ......................................... fanout_to_subgraph_10x_checkpoint_sync: Mean +- std dev: 94.9 ms +- 0.8 ms ......................................... fanout_to_subgraph_100x: Mean +- std dev: 649 ms +- 32 ms ......................................... fanout_to_subgraph_100x_sync: Mean +- std dev: 519 ms +- 8 ms ......................................... fanout_to_subgraph_100x_checkpoint: Mean +- std dev: 1.01 sec +- 0.05 sec ......................................... fanout_to_subgraph_100x_checkpoint_sync: Mean +- std dev: 956 ms +- 19 ms ......................................... react_agent_10x: Mean +- std dev: 31.1 ms +- 0.6 ms ......................................... react_agent_10x_sync: Mean +- std dev: 22.8 ms +- 0.3 ms ......................................... react_agent_10x_checkpoint: Mean +- std dev: 47.0 ms +- 0.8 ms ......................................... react_agent_10x_checkpoint_sync: Mean +- std dev: 36.6 ms +- 0.4 ms ......................................... react_agent_100x: Mean +- std dev: 350 ms +- 7 ms ......................................... react_agent_100x_sync: Mean +- std dev: 276 ms +- 3 ms ......................................... react_agent_100x_checkpoint: Mean +- std dev: 947 ms +- 22 ms ......................................... react_agent_100x_checkpoint_sync: Mean +- std dev: 834 ms +- 14 ms ......................................... wide_state_25x300: Mean +- std dev: 23.8 ms +- 0.6 ms ......................................... wide_state_25x300_sync: Mean +- std dev: 15.0 ms +- 0.2 ms ......................................... wide_state_25x300_checkpoint: Mean +- std dev: 290 ms +- 16 ms ......................................... wide_state_25x300_checkpoint_sync: Mean +- std dev: 277 ms +- 16 ms ......................................... wide_state_15x600: Mean +- std dev: 27.6 ms +- 0.4 ms ......................................... wide_state_15x600_sync: Mean +- std dev: 17.4 ms +- 0.2 ms ......................................... wide_state_15x600_checkpoint: Mean +- std dev: 486 ms +- 15 ms ......................................... wide_state_15x600_checkpoint_sync: Mean +- std dev: 474 ms +- 15 ms ......................................... wide_state_9x1200: Mean +- std dev: 27.8 ms +- 0.5 ms ......................................... wide_state_9x1200_sync: Mean +- std dev: 17.5 ms +- 0.3 ms ......................................... wide_state_9x1200_checkpoint: Mean +- std dev: 320 ms +- 13 ms ......................................... wide_state_9x1200_checkpoint_sync: Mean +- std dev: 304 ms +- 13 ms

Check notice on line 1 in libs/langgraph/langgraph/func/__init__.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------+---------+------------------------+ | Benchmark | main | changes | +=========================================+=========+========================+ | react_agent_10x_checkpoint_sync | 36.5 ms | 36.6 ms: 1.00x slower | +-----------------------------------------+---------+------------------------+ | react_agent_10x_sync | 22.7 ms | 22.8 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_25x300_sync | 14.9 ms | 15.0 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | react_agent_10x | 30.9 ms | 31.1 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_15x600_sync | 17.3 ms | 17.4 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | fanout_to_subgraph_100x_checkpoint_sync | 949 ms | 956 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_15x600 | 27.4 ms | 27.6 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | react_agent_10x_checkpoint | 46.7 ms | 47.0 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_25x300 | 23.6 ms | 23.8 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | fanout_to_subgraph_100x_sync | 514 ms | 519 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | wide_state_9x1200_sync | 17.2 ms | 17.5 ms: 1.01x slower | +-----------------------------------------+---------+------------------------+ | react_agent_100x_sync | 272 ms | 276 ms: 1.02x slower | +-----------------------------------------+---------+------------------------+ | wide_state_9x1200 | 27.3 ms | 27.8 ms: 1.02x slower | +-----------------------------------------+---------+------------------------+ | react_agent_100x | 344 ms | 350 ms: 1.02x slower | +-----------------------------------------+---------+------------------------+ | wide_state_25x300_checkpoint | 286 ms | 290 ms: 1.02x slower | +-----------------------------------------+---------+------------------------+ | wide_state_25x300_checkpoint_sync | 272 ms | 277 ms: 1.02x slower | +-----------------------------------------+---------+------------------------+ | react_agent_100x_checkpoint | 930 ms | 947 ms: 1.02x slower | +-----------------------------------------+---------+------------------------+ | fanout_to_subgraph_10x_sync | 52.4 ms | 53.3 ms: 1.02x slower | +-----------------------------------------+---------+------------------------+ | fanout_to_subgraph_10x | 61.8 ms | 63.4 ms: 1.03x slower | +-----------------------------------------+---------+------------------------+ | fanout_to_subgraph_10x_checkpoint | 93.6 ms | 96.2 ms: 1.03x slower | +-----------------------------------------+---------+------------------------+ | fanout_to_subgraph_100x | 619 ms | 649 ms: 1.05x slower | +-----------------------------------------+---------+------------------------+ | fanout_to_subgraph_100x_checkpoint | 962 ms | 1.01 sec: 1.05x slower | +-----------------------------------------+---------+------------------------+ | Geometric mean | (ref) | 1.01x slower | +-----------------------------------------+---------+------------------------+ Benchmark hidden because not significant (6): wide_state_9x1200_checkpoint_sync, fanout_to_subgraph_10x_checkpoint_sync, wide_state_15x600_checkpoint, react_agent_100x_checkpoint_sync, wide_state_9x1200_checkpoint, wide_stat
import concurrent
import concurrent.futures
import inspect
import types
from functools import partial, update_wrapper
from typing import (
Expand All @@ -24,7 +25,7 @@
from langgraph.pregel.read import PregelNode
from langgraph.pregel.write import ChannelWrite, ChannelWriteEntry
from langgraph.store.base import BaseStore
from langgraph.types import RetryPolicy
from langgraph.types import RetryPolicy, StreamMode, StreamWriter

P = ParamSpec("P")
P1 = TypeVar("P1")
Expand Down Expand Up @@ -76,10 +77,32 @@ def entrypoint(
store: Optional[BaseStore] = None,
) -> Callable[[types.FunctionType], Pregel]:
def _imp(func: types.FunctionType) -> Pregel:
if inspect.isgeneratorfunction(func):

def gen_wrapper(*args: Any, writer: StreamWriter, **kwargs: Any) -> Any:
for chunk in func(*args, **kwargs):
writer(chunk)

bound = get_runnable_for_func(gen_wrapper)
stream_mode: StreamMode = "custom"
elif inspect.isasyncgenfunction(func):

async def agen_wrapper(
*args: Any, writer: StreamWriter, **kwargs: Any
) -> Any:
async for chunk in func(*args, **kwargs):
writer(chunk)

bound = get_runnable_for_func(agen_wrapper)
stream_mode = "custom"
else:
bound = get_runnable_for_func(func)
stream_mode = "updates"

return Pregel(
nodes={
func.__name__: PregelNode(
bound=get_runnable_for_func(func),
bound=bound,
triggers=[START],
channels=[START],
writers=[ChannelWrite([ChannelWriteEntry(END)], tags=[TAG_HIDDEN])],
Expand All @@ -89,7 +112,7 @@ def _imp(func: types.FunctionType) -> Pregel:
input_channels=START,
output_channels=END,
stream_channels=END,
stream_mode="updates",
stream_mode=stream_mode,
checkpointer=checkpointer,
store=store,
)
Expand Down

0 comments on commit e0a0958

Please sign in to comment.