Skip to content

Commit

Permalink
x
Browse files Browse the repository at this point in the history
  • Loading branch information
eyurtsev committed Dec 5, 2024
1 parent de123d6 commit 9f93e48
Showing 1 changed file with 168 additions and 50 deletions.
218 changes: 168 additions & 50 deletions docs/docs/how-tos/human_in_the_loop/interrupt.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,33 @@
"tags": []
},
"source": [
"# How to use `interrupt` for human-in-the-loop workflows\n",
"# How to use interrupt for human-in-the-loop workflows\n",
"\n",
"To use an `interrupt` you must pass a checkpointer.\n",
"An `interrupt` is a convenient way to support human-in-the-loop workflows.\n",
"\n",
"1. Used with checkpointers\n",
"1. Used together with `Command` to resume.\n",
"2. Interrupt information is available when streaming. If using invoke with human in the loop need to inspect next\n",
"3. Cannot be used twice in a node.\n",
"To use an `interrupt`, you must enable a checkpointer, as the feature relies on persisting the graph state.\n",
"\n",
"An interrupt can be used within a node like this:\n",
"An `interrupt` can be used within a node to pause execution and wait for input, as shown in this example:\n",
"\n",
"```python\n",
"async def some_node(state: State):\n",
" ...\n",
" # Any value that we want to surface as part of the interrupt\n",
" value = {\"foo\": \"bar\"} \n",
" # Surface any value as part of the interrupt\n",
" value = {\"question\": \"how old are you?\"} \n",
" answer = interrupt(value)\n",
" ...\n",
"```\n",
"\n",
"Graph execution willbe interrupted when the code reaches\n",
"the `interrup` function. It can be resumed by passing `Command`.\n",
"Graph execution will pause when the interrupt function is called. To resume execution, pass a `Command` with the desired resume value:\n",
"\n",
"```python\n",
"graph.invoke(Command(resume=some_value)))\n",
"```"
"for chunk in graph.stream(Command(resume=some_value), config={\"configurable\": {\"thread_id\": ...}}):\n",
" ...\n",
"```\n",
"\n",
"Remember that graph execution always restarts at the beginning of the node. Be cautious of side effects, such as API calls that mutate data, as these may inadvertently be triggered multiple times.\n",
"\n",
"When a node contains multiple interrupt calls, LangGraph maintains a list of resume values provided during graph execution. When resuming, execution always starts at the beginning of the node, and for each interrupt encountered, LangGraph checks whether a corresponding value exists in the list. Matching is strictly index-based, making the order of interrupt calls within the node critical. Users should avoid logic that dynamically removes, adds, or reorders interrupt calls between executions, as this can lead to mismatched indices. Such patterns often involve unconventional state mutations, such as altering state via `Command(resume=..., update=SOME_STATE_MUTATION)` or relying on global variables to modify the node's structure."
]
},
{
Expand All @@ -52,12 +53,12 @@
"source": [
"## Setup\n",
"\n",
"First we need to install the packages required"
"First we need to install the required packages:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 14,
"id": "cbc003a6-b45f-4526-bcf1-963d951797ae",
"metadata": {
"editable": true,
Expand Down Expand Up @@ -102,14 +103,14 @@
"tags": []
},
"source": [
"## Interrupt and resume\n",
"## Basic usage of interrupt and Command\n",
"\n",
"Here is a minimal example that shows how to `interrupt` and `resume` a graph consisting of one node."
"Here is an exmaple that shows how to use `interrupt` to interrupt the execution of a graph, and then resume the execution using the `Command` primitive."
]
},
{
"cell_type": "code",
"execution_count": 7,
"execution_count": 10,
"id": "ce902820-2739-402f-a784-867a44a3997c",
"metadata": {
"editable": true,
Expand All @@ -124,7 +125,7 @@
"output_type": "stream",
"text": [
"> Entered the node: 1 # of times\n",
"{'__interrupt__': (Interrupt(value='what is your age?', resumable=True, ns=['node:ec36fcc8-2135-a0e6-4681-fc0138772022'], when='during'),)}\n"
"{'__interrupt__': (Interrupt(value='what is your age?', resumable=True, ns=['node:62e598fa-8653-9d6d-2046-a70203020e37'], when='during'),)}\n"
]
}
],
Expand Down Expand Up @@ -163,10 +164,13 @@
"builder.add_edge(START, \"node\")\n",
"\n",
"# A checkpointer must be enabled for interrupts to work!\n",
"graph = builder.compile(checkpointer=MemorySaver())\n",
"checkpointer = MemorySaver()\n",
"graph = builder.compile(checkpointer=checkpointer)\n",
"\n",
"config = {\n",
" \"thread_id\": uuid.uuid4(),\n",
" \"configurable\": {\n",
" \"thread_id\": uuid.uuid4(),\n",
" }\n",
"}\n",
"\n",
"for chunk in graph.stream({\"foo\": \"abc\"}, config):\n",
Expand All @@ -189,7 +193,7 @@
},
{
"cell_type": "code",
"execution_count": 6,
"execution_count": 11,
"id": "c2ea37d8-4b92-442d-85e1-212e20123907",
"metadata": {
"editable": true,
Expand Down Expand Up @@ -227,11 +231,13 @@
"tags": []
},
"source": [
"!!! important \"Graph execution resumes at the start of a **node**\"\n",
"!!! important \"Graph execution resumes at the start of a node\"\n",
"\n",
" Graph execution resumes from the start of the **node** where the interrupt was raised rather than from the line where the `interrupt` was raised.\n",
"\n",
" Graph execution resumes from the **node** where the interrupt was raised rather than from the line where the `interrupt` was raised.\n",
" As a result, you should see that the node was entered 2 times rather than once!\n",
"\n",
" As a result, you should see `> Entered the node: 2 # of times`."
" Exercise care if your code has side-effects like making mutable API calls between consecutive interrupts!"
]
},
{
Expand All @@ -245,12 +251,14 @@
"tags": []
},
"source": [
"## Validation of input with resume"
"## Using multiple interrupts calls within a single node\n",
"\n",
"In some situations, you may need to use interrupt more than once within a single node. A common use case is performing runtime validation on the value supplied through `Command(resume=value)`."
]
},
{
"cell_type": "code",
"execution_count": 127,
"execution_count": 12,
"id": "30b5821e-8bb8-4076-b477-60f39ea65445",
"metadata": {
"editable": true,
Expand All @@ -265,7 +273,7 @@
"output_type": "stream",
"text": [
"> Entered the node: 1 # of times\n",
"{'__interrupt__': (Interrupt(value='What is your age?', resumable=True, ns=['node:a1c64b81-7957-8c3c-7728-e73cb81837d5'], when='during'),)}\n"
"{'__interrupt__': (Interrupt(value='What is your age?', resumable=True, ns=['node:ed4d470f-5753-d7f3-eec6-4435f5f93f72'], when='during'),)}\n"
]
}
],
Expand All @@ -275,8 +283,7 @@
"from typing import TypedDict, Annotated, Optional, Literal\n",
"\n",
"from langgraph.graph import StateGraph\n",
"from langgraph.graph.state import GraphCommand\n",
"from langgraph.constants import START, INTERRUPT\n",
"from langgraph.constants import START\n",
"from langgraph.types import interrupt, Command\n",
"from langgraph.checkpoint.memory import MemorySaver\n",
"\n",
Expand All @@ -290,30 +297,44 @@
"\n",
"counter = 0\n",
"\n",
"def node(state: State) -> GraphCommand[Literal['node']]:\n",
"def node(state: State):\n",
" global counter\n",
" counter +=1 \n",
" print(f'> Entered the node: {counter} # of times')\n",
"\n",
" answer = interrupt(\n",
" \"What is your age?\"\n",
" )\n",
" answer = None\n",
" question = \"What is your age?\"\n",
"\n",
" if not isinstance(answer, int) or answer < 0:\n",
" return GraphCommand(goto=\"node\")\n",
" while answer is None:\n",
" answer = interrupt(\n",
" question\n",
" )\n",
"\n",
" if not isinstance(answer, int) or answer < 0:\n",
" question = f\"'{answer} is not a valid age. What is your age?\"\n",
" answer = None\n",
" continue\n",
" else:\n",
" break\n",
"\n",
" return {\n",
" \"human_value\": f\"The human is {answer} years old.\"\n",
" }\n",
"\n",
" return GraphCommand(update={\"human_value\": answer})\n",
" \n",
"\n",
"builder = StateGraph(State)\n",
"builder.add_node(\"node\", node)\n",
"builder.add_edge(START, \"node\")\n",
"\n",
"# A checkpointer must be enabled for interrupts to work!\n",
"graph = builder.compile(checkpointer=MemorySaver())\n",
"checkpointer = MemorySaver()\n",
"graph = builder.compile(checkpointer=checkpointer)\n",
"\n",
"config = {\n",
" \"thread_id\": uuid.uuid4(),\n",
" \"configurable\": {\n",
" \"thread_id\": uuid.uuid4(),\n",
" }\n",
"}\n",
"\n",
"for chunk in graph.stream({\"foo\": \"abc\"}, config):\n",
Expand All @@ -336,7 +357,7 @@
},
{
"cell_type": "code",
"execution_count": 128,
"execution_count": 13,
"id": "a79e330d-f849-44ea-af37-221efcffe1a3",
"metadata": {
"editable": true,
Expand All @@ -351,9 +372,7 @@
"output_type": "stream",
"text": [
"> Entered the node: 2 # of times\n",
"{'node': None}\n",
"> Entered the node: 3 # of times\n",
"{'__interrupt__': (Interrupt(value='What is your age?', resumable=True, ns=['node:995027b2-c8ad-8951-74ff-d9b86fca74bb'], when='during'),)}\n"
"{'__interrupt__': (Interrupt(value=\"'-20 is not a valid age. What is your age?\", resumable=True, ns=['node:ed4d470f-5753-d7f3-eec6-4435f5f93f72'], when='during'),)}\n"
]
}
],
Expand All @@ -365,7 +384,28 @@
},
{
"cell_type": "code",
"execution_count": 130,
"execution_count": 14,
"id": "170b0cde-d94b-4aca-bfea-70f9927f4288",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"> Entered the node: 3 # of times\n",
"{'__interrupt__': (Interrupt(value=\"'{'foo': 'bar'} is not a valid age. What is your age?\", resumable=True, ns=['node:ed4d470f-5753-d7f3-eec6-4435f5f93f72'], when='during'),)}\n"
]
}
],
"source": [
"bad_input = {\"foo\": \"bar\"} # Not a number!\n",
"for chunk in graph.stream(Command(resume=bad_input), config):\n",
" print(chunk)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "c9d01c77-7a9c-43aa-bbfa-c969b1d3e3f2",
"metadata": {
"editable": true,
Expand All @@ -374,9 +414,18 @@
},
"tags": []
},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"> Entered the node: 4 # of times\n",
"{'node': {'human_value': 'The human is 25 years old.'}}\n"
]
}
],
"source": [
"ok_input = 5\n",
"ok_input = 25\n",
"for chunk in graph.stream(Command(resume=ok_input), config):\n",
" print(chunk)"
]
Expand All @@ -394,12 +443,12 @@
"source": [
"## Usage with invoke / ainvoke\n",
"\n",
"If you're not using `stream` or `astream`, you will need to explicitly access the state of the graph to get information about the interrupt."
"If you're using `invoke` and/or `ainvoke`, you will need to explicitly access the state of the graph using `graph.get_state(config)` to determine if there was an interrupt and if so what value it was associated with."
]
},
{
"cell_type": "code",
"execution_count": 102,
"execution_count": 21,
"id": "c9ae2c5f-c81d-4188-9f7e-f3f90e675e12",
"metadata": {
"editable": true,
Expand Down Expand Up @@ -452,15 +501,84 @@
"builder.add_edge(START, \"node\")\n",
"\n",
"# A checkpointer must be enabled for interrupts to work!\n",
"graph = builder.compile(checkpointer=MemorySaver())\n",
"checkpointer = MemorySaver()\n",
"graph = builder.compile(checkpointer=checkpointer)\n",
"\n",
"config = {\n",
" \"thread_id\": uuid.uuid4(),\n",
" \"configurable\": {\n",
" \"thread_id\": uuid.uuid4(),\n",
" }\n",
"}\n",
"\n",
"for event in graph.invoke({\"foo\": \"abc\"}, config):\n",
" print"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "f15b73f1-259b-4045-b633-2686873ef1f6",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"('node',)\n",
"\n",
"PregelTask(id='57efb8b4-1170-b872-647e-60de07598d61', name='node', path=('__pregel_pull', 'node'), error=None, interrupts=(Interrupt(value='what is your age?', resumable=True, ns=['node:57efb8b4-1170-b872-647e-60de07598d61'], when='during'),), state=None, result=None)\n",
"\n",
"(Interrupt(value='what is your age?', resumable=True, ns=['node:57efb8b4-1170-b872-647e-60de07598d61'], when='during'),)\n"
]
}
],
"source": [
"state = graph.get_state(config)\n",
"\n",
"print(state.next)\n",
"print()\n",
"print(state.tasks[0])\n",
"print()\n",
"print(state.tasks[0].interrupts)"
]
},
{
"cell_type": "markdown",
"id": "113a746b-c9b7-4efa-ad6c-516f85b9cf5f",
"metadata": {},
"source": [
"Let's resume now:"
]
},
{
"cell_type": "code",
"execution_count": 23,
"id": "69a46d8f-ebe6-4ee0-84c9-30ce81d576c9",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"> Entered the node: 2 # of times\n",
"> Received an input from the interrupt: 25\n"
]
},
{
"data": {
"text/plain": [
"{'foo': 'abc', 'human_value': 25}"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ok_input = 25\n",
"graph.invoke(Command(resume=ok_input), config)"
]
}
],
"metadata": {
Expand Down

0 comments on commit 9f93e48

Please sign in to comment.