From 9f93e48a6742888d40291756f5144285f087bab4 Mon Sep 17 00:00:00 2001 From: Eugene Yurtsev Date: Wed, 4 Dec 2024 22:19:32 -0500 Subject: [PATCH] x --- .../how-tos/human_in_the_loop/interrupt.ipynb | 218 ++++++++++++++---- 1 file changed, 168 insertions(+), 50 deletions(-) diff --git a/docs/docs/how-tos/human_in_the_loop/interrupt.ipynb b/docs/docs/how-tos/human_in_the_loop/interrupt.ipynb index fc8fd0966..043e60c6b 100644 --- a/docs/docs/how-tos/human_in_the_loop/interrupt.ipynb +++ b/docs/docs/how-tos/human_in_the_loop/interrupt.ipynb @@ -11,32 +11,33 @@ "tags": [] }, "source": [ - "# How to use `interrupt` for human-in-the-loop workflows\n", + "# How to use interrupt for human-in-the-loop workflows\n", "\n", - "To use an `interrupt` you must pass a checkpointer.\n", + "An `interrupt` is a convenient way to support human-in-the-loop workflows.\n", "\n", - "1. Used with checkpointers\n", - "1. Used together with `Command` to resume.\n", - "2. Interrupt information is available when streaming. If using invoke with human in the loop need to inspect next\n", - "3. Cannot be used twice in a node.\n", + "To use an `interrupt`, you must enable a checkpointer, as the feature relies on persisting the graph state.\n", "\n", - "An interrupt can be used within a node like this:\n", + "An `interrupt` can be used within a node to pause execution and wait for input, as shown in this example:\n", "\n", "```python\n", "async def some_node(state: State):\n", " ...\n", - " # Any value that we want to surface as part of the interrupt\n", - " value = {\"foo\": \"bar\"} \n", + " # Surface any value as part of the interrupt\n", + " value = {\"question\": \"how old are you?\"} \n", " answer = interrupt(value)\n", " ...\n", "```\n", "\n", - "Graph execution willbe interrupted when the code reaches\n", - "the `interrup` function. It can be resumed by passing `Command`.\n", + "Graph execution will pause when the interrupt function is called. To resume execution, pass a `Command` with the desired resume value:\n", "\n", "```python\n", - "graph.invoke(Command(resume=some_value)))\n", - "```" + "for chunk in graph.stream(Command(resume=some_value), config={\"configurable\": {\"thread_id\": ...}}):\n", + " ...\n", + "```\n", + "\n", + "Remember that graph execution always restarts at the beginning of the node. Be cautious of side effects, such as API calls that mutate data, as these may inadvertently be triggered multiple times.\n", + "\n", + "When a node contains multiple interrupt calls, LangGraph maintains a list of resume values provided during graph execution. When resuming, execution always starts at the beginning of the node, and for each interrupt encountered, LangGraph checks whether a corresponding value exists in the list. Matching is strictly index-based, making the order of interrupt calls within the node critical. Users should avoid logic that dynamically removes, adds, or reorders interrupt calls between executions, as this can lead to mismatched indices. Such patterns often involve unconventional state mutations, such as altering state via `Command(resume=..., update=SOME_STATE_MUTATION)` or relying on global variables to modify the node's structure." ] }, { @@ -52,12 +53,12 @@ "source": [ "## Setup\n", "\n", - "First we need to install the packages required" + "First we need to install the required packages:" ] }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 14, "id": "cbc003a6-b45f-4526-bcf1-963d951797ae", "metadata": { "editable": true, @@ -102,14 +103,14 @@ "tags": [] }, "source": [ - "## Interrupt and resume\n", + "## Basic usage of interrupt and Command\n", "\n", - "Here is a minimal example that shows how to `interrupt` and `resume` a graph consisting of one node." + "Here is an exmaple that shows how to use `interrupt` to interrupt the execution of a graph, and then resume the execution using the `Command` primitive." ] }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 10, "id": "ce902820-2739-402f-a784-867a44a3997c", "metadata": { "editable": true, @@ -124,7 +125,7 @@ "output_type": "stream", "text": [ "> Entered the node: 1 # of times\n", - "{'__interrupt__': (Interrupt(value='what is your age?', resumable=True, ns=['node:ec36fcc8-2135-a0e6-4681-fc0138772022'], when='during'),)}\n" + "{'__interrupt__': (Interrupt(value='what is your age?', resumable=True, ns=['node:62e598fa-8653-9d6d-2046-a70203020e37'], when='during'),)}\n" ] } ], @@ -163,10 +164,13 @@ "builder.add_edge(START, \"node\")\n", "\n", "# A checkpointer must be enabled for interrupts to work!\n", - "graph = builder.compile(checkpointer=MemorySaver())\n", + "checkpointer = MemorySaver()\n", + "graph = builder.compile(checkpointer=checkpointer)\n", "\n", "config = {\n", - " \"thread_id\": uuid.uuid4(),\n", + " \"configurable\": {\n", + " \"thread_id\": uuid.uuid4(),\n", + " }\n", "}\n", "\n", "for chunk in graph.stream({\"foo\": \"abc\"}, config):\n", @@ -189,7 +193,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 11, "id": "c2ea37d8-4b92-442d-85e1-212e20123907", "metadata": { "editable": true, @@ -227,11 +231,13 @@ "tags": [] }, "source": [ - "!!! important \"Graph execution resumes at the start of a **node**\"\n", + "!!! important \"Graph execution resumes at the start of a node\"\n", + "\n", + " Graph execution resumes from the start of the **node** where the interrupt was raised rather than from the line where the `interrupt` was raised.\n", "\n", - " Graph execution resumes from the **node** where the interrupt was raised rather than from the line where the `interrupt` was raised.\n", + " As a result, you should see that the node was entered 2 times rather than once!\n", "\n", - " As a result, you should see `> Entered the node: 2 # of times`." + " Exercise care if your code has side-effects like making mutable API calls between consecutive interrupts!" ] }, { @@ -245,12 +251,14 @@ "tags": [] }, "source": [ - "## Validation of input with resume" + "## Using multiple interrupts calls within a single node\n", + "\n", + "In some situations, you may need to use interrupt more than once within a single node. A common use case is performing runtime validation on the value supplied through `Command(resume=value)`." ] }, { "cell_type": "code", - "execution_count": 127, + "execution_count": 12, "id": "30b5821e-8bb8-4076-b477-60f39ea65445", "metadata": { "editable": true, @@ -265,7 +273,7 @@ "output_type": "stream", "text": [ "> Entered the node: 1 # of times\n", - "{'__interrupt__': (Interrupt(value='What is your age?', resumable=True, ns=['node:a1c64b81-7957-8c3c-7728-e73cb81837d5'], when='during'),)}\n" + "{'__interrupt__': (Interrupt(value='What is your age?', resumable=True, ns=['node:ed4d470f-5753-d7f3-eec6-4435f5f93f72'], when='during'),)}\n" ] } ], @@ -275,8 +283,7 @@ "from typing import TypedDict, Annotated, Optional, Literal\n", "\n", "from langgraph.graph import StateGraph\n", - "from langgraph.graph.state import GraphCommand\n", - "from langgraph.constants import START, INTERRUPT\n", + "from langgraph.constants import START\n", "from langgraph.types import interrupt, Command\n", "from langgraph.checkpoint.memory import MemorySaver\n", "\n", @@ -290,19 +297,30 @@ "\n", "counter = 0\n", "\n", - "def node(state: State) -> GraphCommand[Literal['node']]:\n", + "def node(state: State):\n", " global counter\n", " counter +=1 \n", " print(f'> Entered the node: {counter} # of times')\n", "\n", - " answer = interrupt(\n", - " \"What is your age?\"\n", - " )\n", + " answer = None\n", + " question = \"What is your age?\"\n", "\n", - " if not isinstance(answer, int) or answer < 0:\n", - " return GraphCommand(goto=\"node\")\n", + " while answer is None:\n", + " answer = interrupt(\n", + " question\n", + " )\n", + "\n", + " if not isinstance(answer, int) or answer < 0:\n", + " question = f\"'{answer} is not a valid age. What is your age?\"\n", + " answer = None\n", + " continue\n", + " else:\n", + " break\n", + "\n", + " return {\n", + " \"human_value\": f\"The human is {answer} years old.\"\n", + " }\n", "\n", - " return GraphCommand(update={\"human_value\": answer})\n", " \n", "\n", "builder = StateGraph(State)\n", @@ -310,10 +328,13 @@ "builder.add_edge(START, \"node\")\n", "\n", "# A checkpointer must be enabled for interrupts to work!\n", - "graph = builder.compile(checkpointer=MemorySaver())\n", + "checkpointer = MemorySaver()\n", + "graph = builder.compile(checkpointer=checkpointer)\n", "\n", "config = {\n", - " \"thread_id\": uuid.uuid4(),\n", + " \"configurable\": {\n", + " \"thread_id\": uuid.uuid4(),\n", + " }\n", "}\n", "\n", "for chunk in graph.stream({\"foo\": \"abc\"}, config):\n", @@ -336,7 +357,7 @@ }, { "cell_type": "code", - "execution_count": 128, + "execution_count": 13, "id": "a79e330d-f849-44ea-af37-221efcffe1a3", "metadata": { "editable": true, @@ -351,9 +372,7 @@ "output_type": "stream", "text": [ "> Entered the node: 2 # of times\n", - "{'node': None}\n", - "> Entered the node: 3 # of times\n", - "{'__interrupt__': (Interrupt(value='What is your age?', resumable=True, ns=['node:995027b2-c8ad-8951-74ff-d9b86fca74bb'], when='during'),)}\n" + "{'__interrupt__': (Interrupt(value=\"'-20 is not a valid age. What is your age?\", resumable=True, ns=['node:ed4d470f-5753-d7f3-eec6-4435f5f93f72'], when='during'),)}\n" ] } ], @@ -365,7 +384,28 @@ }, { "cell_type": "code", - "execution_count": 130, + "execution_count": 14, + "id": "170b0cde-d94b-4aca-bfea-70f9927f4288", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "> Entered the node: 3 # of times\n", + "{'__interrupt__': (Interrupt(value=\"'{'foo': 'bar'} is not a valid age. What is your age?\", resumable=True, ns=['node:ed4d470f-5753-d7f3-eec6-4435f5f93f72'], when='during'),)}\n" + ] + } + ], + "source": [ + "bad_input = {\"foo\": \"bar\"} # Not a number!\n", + "for chunk in graph.stream(Command(resume=bad_input), config):\n", + " print(chunk)" + ] + }, + { + "cell_type": "code", + "execution_count": 15, "id": "c9d01c77-7a9c-43aa-bbfa-c969b1d3e3f2", "metadata": { "editable": true, @@ -374,9 +414,18 @@ }, "tags": [] }, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "> Entered the node: 4 # of times\n", + "{'node': {'human_value': 'The human is 25 years old.'}}\n" + ] + } + ], "source": [ - "ok_input = 5\n", + "ok_input = 25\n", "for chunk in graph.stream(Command(resume=ok_input), config):\n", " print(chunk)" ] @@ -394,12 +443,12 @@ "source": [ "## Usage with invoke / ainvoke\n", "\n", - "If you're not using `stream` or `astream`, you will need to explicitly access the state of the graph to get information about the interrupt." + "If you're using `invoke` and/or `ainvoke`, you will need to explicitly access the state of the graph using `graph.get_state(config)` to determine if there was an interrupt and if so what value it was associated with." ] }, { "cell_type": "code", - "execution_count": 102, + "execution_count": 21, "id": "c9ae2c5f-c81d-4188-9f7e-f3f90e675e12", "metadata": { "editable": true, @@ -452,15 +501,84 @@ "builder.add_edge(START, \"node\")\n", "\n", "# A checkpointer must be enabled for interrupts to work!\n", - "graph = builder.compile(checkpointer=MemorySaver())\n", + "checkpointer = MemorySaver()\n", + "graph = builder.compile(checkpointer=checkpointer)\n", "\n", "config = {\n", - " \"thread_id\": uuid.uuid4(),\n", + " \"configurable\": {\n", + " \"thread_id\": uuid.uuid4(),\n", + " }\n", "}\n", "\n", "for event in graph.invoke({\"foo\": \"abc\"}, config):\n", " print" ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "f15b73f1-259b-4045-b633-2686873ef1f6", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "('node',)\n", + "\n", + "PregelTask(id='57efb8b4-1170-b872-647e-60de07598d61', name='node', path=('__pregel_pull', 'node'), error=None, interrupts=(Interrupt(value='what is your age?', resumable=True, ns=['node:57efb8b4-1170-b872-647e-60de07598d61'], when='during'),), state=None, result=None)\n", + "\n", + "(Interrupt(value='what is your age?', resumable=True, ns=['node:57efb8b4-1170-b872-647e-60de07598d61'], when='during'),)\n" + ] + } + ], + "source": [ + "state = graph.get_state(config)\n", + "\n", + "print(state.next)\n", + "print()\n", + "print(state.tasks[0])\n", + "print()\n", + "print(state.tasks[0].interrupts)" + ] + }, + { + "cell_type": "markdown", + "id": "113a746b-c9b7-4efa-ad6c-516f85b9cf5f", + "metadata": {}, + "source": [ + "Let's resume now:" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "69a46d8f-ebe6-4ee0-84c9-30ce81d576c9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "> Entered the node: 2 # of times\n", + "> Received an input from the interrupt: 25\n" + ] + }, + { + "data": { + "text/plain": [ + "{'foo': 'abc', 'human_value': 25}" + ] + }, + "execution_count": 23, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ok_input = 25\n", + "graph.invoke(Command(resume=ok_input), config)" + ] } ], "metadata": {