Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The difference between two scene processing strategies at the merging node of LangGraph parallel branch. #2866

Open
4 tasks done
huangyuanyuan000 opened this issue Dec 24, 2024 · 2 comments

Comments

@huangyuanyuan000
Copy link

huangyuanyuan000 commented Dec 24, 2024

Checked other resources

  • This is a bug, not a usage question. For questions, please use GitHub Discussions.
  • I added a clear and detailed title that summarizes the issue.
  • I read what a minimal reproducible example is (https://stackoverflow.com/help/minimal-reproducible-example).
  • I included a self-contained, minimal example that demonstrates the issue INCLUDING all the relevant imports. The code run AS IS to reproduce the issue.

Example Code

import operator
from langgraph.graph import StateGraph
from typing import Annotated
from typing import TypedDict


class AgentState(TypedDict):
    input: str
    history: Annotated[list[dict], operator.add]


def getapp():
    def start(state):
        print("run: START")
        return {"history": [{}]}

    def end(state):
        print("run: END")
        # 返回代理的结果
        return {"history": [{}]}

    def run_agent(name):
        def run(state):
            print("run: " + name)
            return {"history": [{}]}

        return run

    def conditional(state):
        # 返回代理的结果
        return ["agent2", "agent3"]

    workflow = StateGraph(AgentState)
    # scenario_one(conditional, end, run_agent, start, workflow)
    scenario_two(conditional, end, run_agent, start, workflow)
    app = workflow.compile()
    return app


def scenario_one(conditional, end, run_agent, start, workflow):
    """
    Scenario 1 graph:
    start -> agent1 -> agent2 -> agen4 -> end
                    -> agent3
    Branch one is agent2.
    Branch tow is agent2.
    The number of nodes in branch one is equal to the number of nodes in branch two.
    """
    workflow.add_node("start", start)
    workflow.add_node("end", end)
    workflow.add_node("agent1", run_agent("agen1"))
    workflow.add_node("agent2", run_agent("agen2"))
    workflow.add_node("agent3", run_agent("agen3"))
    workflow.add_node("agent4", run_agent("agen4"))
    workflow.add_edge("start", "agent1")
    workflow.add_conditional_edges("agent1", conditional, {"agent2": "agent2", "agent3": "agent3"})
    workflow.add_edge("agent2", "agent4")
    workflow.add_edge("agent3", "agent4")
    workflow.add_edge("agent3", "agent4")
    workflow.add_edge("agent4", "end")
    workflow.set_entry_point("start")
    workflow.set_finish_point("end")


def scenario_two(conditional, end, run_agent, start, workflow):
    """
    Scenario 1 graph
    start -> agent1 -> agent2                 -> agen4 -> end
                    -> agent3 -> agent3.5
    Branch one is agent2.
    Branch tow is agent3 and agent3.5.
    The number of nodes in branch two is greater than that in branch one.
    """
    workflow.add_node("start", start)
    workflow.add_node("end", end)
    workflow.add_node("agent1", run_agent("agen1"))
    workflow.add_node("agent2", run_agent("agen2"))
    workflow.add_node("agent3", run_agent("agen3"))
    workflow.add_node("agent3.5", run_agent("agen3.5"))
    workflow.add_node("agent4", run_agent("agen4"))
    workflow.add_edge("start", "agent1")
    workflow.add_conditional_edges("agent1", conditional, {"agent2": "agent2", "agent3": "agent3"})
    workflow.add_edge("agent2", "agent4")
    workflow.add_edge("agent3", "agent4")
    workflow.add_edge("agent3", "agent3.5")
    workflow.add_edge("agent3.5", "agent4")
    workflow.add_edge("agent4", "end")
    workflow.set_entry_point("start")
    workflow.set_finish_point("end")


if __name__ == '__main__':
    getapp().invoke(input={"input": ""}, config={"recursion_limit": 10})

Error Message and Stack Trace (if applicable)

Scenario 1:

run: START
run:agen1
run:agen2
run:agen3
run:agen4
run: END

Scenario 2:
run: START
run:agen1
run:agen2
run:agen3
run:agen3.5
run:agen4
run: END
run:agen4
run: END

Description

In the scenario of parallel branching and merging, when the number of nodes in multiple branches is the same, the merging node only runs once, which is in line with the Wait for All Strategy. However, when the number of nodes in the branches is not the same, the merging node and subsequent processes will run multiple times, which is inconsistent with the above strategy and barely belongs to the Parallel Merge Strategy.

System Info

LangGraph Version Description:
image

@gbaian10
Copy link
Contributor

This operates completely normally.
The method behind langgraph can be simply thought of as a BFS (Breadth-First Search) approach.
Specifically, it runs in a way similar to the method shown in the image below.

image


I'm not sure what result you were expecting.
If you want all the nodes pointing to each other on the mermaid diagram (agent4) to wait for one another, you just need to modify it as shown below.

image

@gbaian10
Copy link
Contributor

If you want to better observe the flow of the graph, use langgraph-platform to track its movement.
You can easily insert a sleep function in the nodes to observe the flow direction. This will help you understand how the graph operates more intuitively.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants