Skip to content

Commit

Permalink
Merge pull request #2611 from langchain-ai/vb/remote-graph-kwargs
Browse files Browse the repository at this point in the history
langgraph: allow passing kwargs to SDK methods in RemoteGraph's invoke/stream
  • Loading branch information
nfcampos authored Dec 3, 2024
2 parents 36b6cd1 + 515242d commit 2d87195
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions libs/langgraph/langgraph/pregel/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ def stream(
interrupt_before: Optional[Union[All, Sequence[str]]] = None,
interrupt_after: Optional[Union[All, Sequence[str]]] = None,
subgraphs: bool = False,
**kwargs: Any,
) -> Iterator[Union[dict[str, Any], Any]]:
"""Create a run and stream the results.
Expand All @@ -589,6 +590,7 @@ def stream(
interrupt_before: Interrupt the graph before these nodes.
interrupt_after: Interrupt the graph after these nodes.
subgraphs: Stream from subgraphs.
**kwargs: Additional params to pass to client.runs.stream.
Yields:
The output of the graph.
Expand Down Expand Up @@ -616,6 +618,7 @@ def stream(
interrupt_after=interrupt_after,
stream_subgraphs=subgraphs or stream is not None,
if_not_exists="create",
**kwargs,
):
# split mode and ns
if NS_SEP in chunk.event:
Expand Down Expand Up @@ -664,6 +667,7 @@ async def astream(
interrupt_before: Optional[Union[All, Sequence[str]]] = None,
interrupt_after: Optional[Union[All, Sequence[str]]] = None,
subgraphs: bool = False,
**kwargs: Any,
) -> AsyncIterator[Union[dict[str, Any], Any]]:
"""Create a run and stream the results.
Expand All @@ -678,6 +682,7 @@ async def astream(
interrupt_before: Interrupt the graph before these nodes.
interrupt_after: Interrupt the graph after these nodes.
subgraphs: Stream from subgraphs.
**kwargs: Additional params to pass to client.runs.stream.
Yields:
The output of the graph.
Expand Down Expand Up @@ -705,6 +710,7 @@ async def astream(
interrupt_after=interrupt_after,
stream_subgraphs=subgraphs or stream is not None,
if_not_exists="create",
**kwargs,
):
# split mode and ns
if NS_SEP in chunk.event:
Expand Down Expand Up @@ -767,18 +773,16 @@ def invoke(
*,
interrupt_before: Optional[Union[All, Sequence[str]]] = None,
interrupt_after: Optional[Union[All, Sequence[str]]] = None,
**kwargs: Any,
) -> Union[dict[str, Any], Any]:
"""Create a run, wait until it finishes and return the final state.
This method calls `POST /threads/{thread_id}/runs/wait` if a `thread_id`
is speciffed in the `configurable` field of the config or
`POST /runs/wait` otherwise.
Args:
input: Input to the graph.
config: A `RunnableConfig` for graph invocation.
interrupt_before: Interrupt the graph before these nodes.
interrupt_after: Interrupt the graph after these nodes.
**kwargs: Additional params to pass to RemoteGraph.stream.
Returns:
The output of the graph.
Expand All @@ -789,6 +793,7 @@ def invoke(
interrupt_before=interrupt_before,
interrupt_after=interrupt_after,
stream_mode="values",
**kwargs,
):
pass
try:
Expand All @@ -803,18 +808,16 @@ async def ainvoke(
*,
interrupt_before: Optional[Union[All, Sequence[str]]] = None,
interrupt_after: Optional[Union[All, Sequence[str]]] = None,
**kwargs: Any,
) -> Union[dict[str, Any], Any]:
"""Create a run, wait until it finishes and return the final state.
This method calls `POST /threads/{thread_id}/runs/wait` if a `thread_id`
is speciffed in the `configurable` field of the config or
`POST /runs/wait` otherwise.
Args:
input: Input to the graph.
config: A `RunnableConfig` for graph invocation.
interrupt_before: Interrupt the graph before these nodes.
interrupt_after: Interrupt the graph after these nodes.
**kwargs: Additional params to pass to RemoteGraph.astream.
Returns:
The output of the graph.
Expand All @@ -825,6 +828,7 @@ async def ainvoke(
interrupt_before=interrupt_before,
interrupt_after=interrupt_after,
stream_mode="values",
**kwargs,
):
pass
try:
Expand Down

0 comments on commit 2d87195

Please sign in to comment.