Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resume value get reused when resuming parent graph which has subgraph with multiple interrupts #2870

Open
4 tasks done
chinazhangyujia opened this issue Dec 24, 2024 · 1 comment

Comments

@chinazhangyujia
Copy link

Checked other resources

  • This is a bug, not a usage question. For questions, please use GitHub Discussions.
  • I added a clear and detailed title that summarizes the issue.
  • I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
  • I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.

Example Code

# graphs
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from app.ai.agent.base import BaseAgent
from langgraph.types import Checkpointer, interrupt
from langgraph.checkpoint.memory import MemorySaver

class AgentState(TypedDict):
    input: str


def node_1(state: AgentState):
    result = interrupt("interrupt node 1")
    print("result", result)

def node_2(state: AgentState):
    result = interrupt("interrupt node 2")
    print("result", result)

sub_graph = StateGraph(AgentState).add_node("node_1", node_1).add_node("node_2", node_2).add_edge(START, "node_1").add_edge("node_1", "node_2").add_edge("node_2", END).compile()



def invoke_sub_agent(state: AgentState):
    sub_graph.invoke(state)


parent_agent = StateGraph(AgentState).add_node("invoke_sub_agent", invoke_sub_agent).add_edge(START, "invoke_sub_agent").add_edge("invoke_sub_agent", END).compile(checkpointer=MemorySaver())

# invoking the parent graph
from app.ai.agent.test import parent_agent
import uuid

thread_id = uuid.uuid4()
parent_agent.invoke({"input": "test"}, config={"configurable": {"thread_id": thread_id}}, subgraphs=True)

# then the graph get interrupted by the first interrupt as expected

# resume from the first interrupt
parent_agent.invoke(Command(resume=True), config={"configurable": {"thread_id": thread_id}}, subgraphs=True)

# then not just first interrupt get passed, the second interrupt also get passed with same resume value printed out.

The code is supposed to be interrupted twice and wait for human input for twice

Error Message and Stack Trace (if applicable)

No response

Description

As described in the Example code section. I have
parent_graph -> subgraph (two interrupts)

when invoke parent_graph, the parent_graph stops at the first subgraph interrupt as expected. Then I resume, the expected behavior is the graph should pass the first interrupt and stop at the second interrupt, however, both interrupt passed. The second interrupt returns the value I passed for resuming the first interrupt instead of raising a new interrupt.

System Info

langgraph==0.2.60

@gbaian10
Copy link
Contributor

gbaian10 commented Dec 25, 2024

I can replicate this issue; it seems to not work properly in the subgraph.

I added some extra interrupt commands in the parent graph, and it appears to function normally there.
However, in the subgraph, as described in this issue, it references the previous resume value.

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.types import Command, interrupt
from rich import get_console


def node_1(state: MessagesState) -> None:
    print("---node_1---")
    result = interrupt("interrupt node 1")
    get_console().print(f"{result = }")


def node_2(state: MessagesState) -> None:
    print("---node_2---")
    result = interrupt("interrupt node 2")
    get_console().print(f"{result = }")


sub_graph = (
    StateGraph(MessagesState)
    .add_node(node_1)
    .add_node(node_2)
    .add_edge(START, node_1.__name__)
    .add_edge(node_1.__name__, node_2.__name__)
    .add_edge(node_2.__name__, END)
    .compile()
)
sub_graph.name = "sub"


def enter_sub_agent(state: MessagesState) -> None:
    print("---enter_sub_agent---")
    result = interrupt("interrupt enter_sub_agent")
    get_console().print(f"{result = }")


def exit_sub_agent(state: MessagesState) -> None:
    print("---exit_sub_agent---")
    result = interrupt("interrupt exit_sub_agent")
    get_console().print(f"{result = }")


parent_agent = (
    StateGraph(MessagesState)
    .add_node(enter_sub_agent)
    .add_node(sub_graph)
    .add_node(exit_sub_agent)
    .add_edge(START, enter_sub_agent.__name__)
    .add_edge(enter_sub_agent.__name__, sub_graph.get_name())
    .add_edge(sub_graph.get_name(), exit_sub_agent.__name__)
    .add_edge(exit_sub_agent.__name__, END)
    .compile(checkpointer=MemorySaver())
)


config = {"configurable": {"thread_id": "1"}}
for event in parent_agent.stream(
    MessagesState(messages=[]), config, stream_mode="updates", subgraphs=True
):
    print(event)
    print("\n")


def resume(resume_value: str) -> None:
    for event in parent_agent.stream(
        Command(resume=resume_value), config, stream_mode="updates", subgraphs=True
    ):
        print(event)
        print("\n")


resume("Enter sub agent")
resume("Node 1")
resume("Node 2")
# resume("Exit sub agent")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants