Skip to content

Commit

Permalink
Merge pull request #1132 from dimagi/sk/pipeline-output-order
Browse files Browse the repository at this point in the history
[bugfix] select correct node input when multiple are possible
  • Loading branch information
snopoke authored Feb 4, 2025
2 parents 364632b + 4efc65a commit 21b7987
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 15 deletions.
53 changes: 40 additions & 13 deletions apps/pipelines/nodes/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import operator
from abc import ABC
from collections.abc import Sequence
Expand All @@ -11,8 +12,11 @@
from typing_extensions import TypedDict

from apps.experiments.models import ExperimentSession
from apps.pipelines.exceptions import PipelineNodeRunError
from apps.pipelines.logging import LoggingCallbackHandler, noop_logger

logger = logging.getLogger("ocs.pipelines")


def add_messages(left: dict, right: dict):
# If the node already has an output, create a list and append the value to it
Expand Down Expand Up @@ -110,24 +114,47 @@ def process(

self._config = config

for incoming_edge in reversed(incoming_edges):
# We assume there is only a single path that would be valid through
# the graph.
# If we wanted to have multiple parallel paths that end
# in a single node, we should give that node multiple inputs, and
# read the input from that particular input
if incoming_edge in state["outputs"]:
input = state["outputs"][incoming_edge]["message"]
break
else: # This is the first node in the graph
input = state["messages"][-1]
if not incoming_edges:
# This is the first node in the graph
node_input = state["messages"][-1]

# init temp state here to avoid having to do it in each place the pipeline is invoked
state["temp_state"]["user_input"] = input
state["temp_state"]["user_input"] = node_input
state["temp_state"]["attachments"] = [
Attachment.model_validate(att) for att in state.get("attachments", [])
]
return self._process(input=input, state=state, node_id=node_id)
elif len(incoming_edges) == 1:
incoming_edge = incoming_edges[0]
node_input = state["outputs"][incoming_edge]["message"]
else:
# Here we use some internal langgraph state to determine which input to use
# The 'langgraph_triggers' key is set in the metadata of the pipeline by langgraph. Some examples:
# - start:xyz
# - xyz
# - branch:xyz:condition:abc
# - join:abc+def:xyz
node_triggers = config["metadata"]["langgraph_triggers"]
for incoming_edge in incoming_edges:
if any(incoming_edge in trigger for trigger in node_triggers):
node_input = state["outputs"][incoming_edge]["message"]
break
else:
# This shouldn't happen, but keeping it here for now to avoid breaking
logger.warning(f"Cannot determine which input to use for node {node_id}. Switching to fallback.")
for incoming_edge in reversed(incoming_edges):
if incoming_edge in state["outputs"]:
node_input = state["outputs"][incoming_edge]["message"]
break
else:
raise PipelineNodeRunError(
f"Cannot determine which input to use for node {node_id}",
{
"node_id": node_id,
"edge_ids": incoming_edges,
"state_outputs": state["outputs"],
},
)
return self._process(input=node_input, state=state, node_id=node_id)

def process_conditional(self, state: PipelineState, node_id: str | None = None) -> str:
conditional_branch = self._process_conditional(state, node_id)
Expand Down
51 changes: 51 additions & 0 deletions apps/pipelines/tests/test_runnable_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,3 +923,54 @@ def test_parallel_nodes(pipeline):

with pytest.raises(PipelineBuildError, match="Multiple edges connected to the same output"):
create_runnable(pipeline, nodes, edges, lenient=False)


@django_db_with_data(available_apps=("apps.service_providers",))
def test_multiple_valid_inputs(pipeline):
"""This tests the case where a node has multiple valid inputs to make sure it selects the correct one.
start --> router -+-> template --> end
| ^
+---------- ------+
In this graph, the end node can have valid input from 'router' and 'template' (if the router routes
to the template node). The end node should select the input from the 'template' and not the 'router'.
"""
start = start_node()
router = boolean_node()
template = render_template_node("T: {{ input }}")
end = end_node()
nodes = [start, router, template, end]
# ordering of edges is significant
edges = [
{
"id": "start -> router",
"source": start["id"],
"target": router["id"],
},
{
"id": "router -> template",
"source": router["id"],
"target": template["id"],
"sourceHandle": "output_1",
},
{
"id": "template -> end",
"source": template["id"],
"target": end["id"],
},
{
"id": "router -> end",
"source": router["id"],
"target": end["id"],
"sourceHandle": "output_0",
},
]

state = PipelineState(
messages=["not hello"],
experiment_session=ExperimentSessionFactory.build(),
pipeline_version=1,
)
output = create_runnable(pipeline, nodes, edges, lenient=False).invoke(state)
assert output["messages"][-1] == "T: not hello"
4 changes: 2 additions & 2 deletions apps/pipelines/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ def passthrough_node():
}


def boolean_node():
def boolean_node(input_equals="hello"):
return {
"id": str(uuid4()),
"type": nodes.BooleanNode.__name__,
"params": {"name": "boolean", "input_equals": "hello"},
"params": {"name": "boolean", "input_equals": input_equals},
}


Expand Down

0 comments on commit 21b7987

Please sign in to comment.