diff --git a/libs/langgraph/langgraph/constants.py b/libs/langgraph/langgraph/constants.py index 478f23d68..e2d9f069a 100644 --- a/libs/langgraph/langgraph/constants.py +++ b/libs/langgraph/langgraph/constants.py @@ -72,9 +72,11 @@ CONFIG_KEY_CHECKPOINT_NS = sys.intern("checkpoint_ns") # holds the current checkpoint_ns, "" for root graph CONFIG_KEY_NODE_FINISHED = sys.intern("__pregel_node_finished") -# callback to be called when a node is finished -CONFIG_KEY_RESUME_VALUE = sys.intern("__pregel_resume_value") # holds the value that "answers" an interrupt() call +CONFIG_KEY_WRITES = sys.intern("__pregel_writes") +# read-only list of existing task writes +CONFIG_KEY_SCRATCHPAD = sys.intern("__pregel_scratchpad") +# holds a mutable dict for temporary storage scoped to the current task # --- Other constants --- PUSH = sys.intern("__pregel_push") diff --git a/libs/langgraph/langgraph/pregel/algo.py b/libs/langgraph/langgraph/pregel/algo.py index 5d104a85f..0885f12aa 100644 --- a/libs/langgraph/langgraph/pregel/algo.py +++ b/libs/langgraph/langgraph/pregel/algo.py @@ -37,13 +37,13 @@ CONFIG_KEY_CHECKPOINT_NS, CONFIG_KEY_CHECKPOINTER, CONFIG_KEY_READ, - CONFIG_KEY_RESUME_VALUE, + CONFIG_KEY_SCRATCHPAD, CONFIG_KEY_SEND, CONFIG_KEY_STORE, CONFIG_KEY_TASK_ID, + CONFIG_KEY_WRITES, EMPTY_SEQ, INTERRUPT, - MISSING, NO_WRITES, NS_END, NS_SEP, @@ -589,14 +589,13 @@ def prepare_single_task( }, CONFIG_KEY_CHECKPOINT_ID: None, CONFIG_KEY_CHECKPOINT_NS: task_checkpoint_ns, - CONFIG_KEY_RESUME_VALUE: next( - ( - v - for tid, c, v in pending_writes - if tid in (NULL_TASK_ID, task_id) and c == RESUME - ), - configurable.get(CONFIG_KEY_RESUME_VALUE, MISSING), - ), + CONFIG_KEY_WRITES: [ + w + for w in pending_writes + + configurable.get(CONFIG_KEY_WRITES, []) + if w[0] in (NULL_TASK_ID, task_id) + ], + CONFIG_KEY_SCRATCHPAD: {}, }, ), triggers, @@ -713,15 +712,13 @@ def prepare_single_task( }, CONFIG_KEY_CHECKPOINT_ID: None, CONFIG_KEY_CHECKPOINT_NS: task_checkpoint_ns, - CONFIG_KEY_RESUME_VALUE: next( - ( - v - for tid, c, v in pending_writes - if tid in (NULL_TASK_ID, task_id) - and c == RESUME - ), - configurable.get(CONFIG_KEY_RESUME_VALUE, MISSING), - ), + CONFIG_KEY_WRITES: [ + w + for w in pending_writes + + configurable.get(CONFIG_KEY_WRITES, []) + if w[0] in (NULL_TASK_ID, task_id) + ], + CONFIG_KEY_SCRATCHPAD: {}, }, ), triggers, diff --git a/libs/langgraph/langgraph/pregel/io.py b/libs/langgraph/langgraph/pregel/io.py index 693dffce2..ed2c28938 100644 --- a/libs/langgraph/langgraph/pregel/io.py +++ b/libs/langgraph/langgraph/pregel/io.py @@ -4,6 +4,7 @@ from langchain_core.runnables.utils import AddableDict from langgraph.channels.base import BaseChannel, EmptyChannelError +from langgraph.checkpoint.base import PendingWrite from langgraph.constants import ( EMPTY_SEQ, ERROR, @@ -66,7 +67,7 @@ def read_channels( def map_command( - cmd: Command, + cmd: Command, pending_writes: list[PendingWrite] ) -> Iterator[tuple[str, str, Any]]: """Map input chunk to a sequence of pending writes in the form (channel, value).""" if cmd.graph == Command.PARENT: @@ -85,7 +86,11 @@ def map_command( if cmd.resume: if isinstance(cmd.resume, dict) and all(is_task_id(k) for k in cmd.resume): for tid, resume in cmd.resume.items(): - yield (tid, RESUME, resume) + existing: list[Any] = next( + (w[2] for w in pending_writes if w[0] == tid and w[1] == RESUME), [] + ) + existing.append(resume) + yield (tid, RESUME, existing) else: yield (NULL_TASK_ID, RESUME, cmd.resume) if cmd.update: diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index 2a68b00f2..d9af9279e 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -26,6 +26,7 @@ from langgraph.channels.base import BaseChannel from langgraph.checkpoint.base import ( + WRITES_IDX_MAP, BaseCheckpointSaver, ChannelVersions, Checkpoint, @@ -263,8 +264,28 @@ def put_writes(self, task_id: str, writes: Sequence[tuple[str, Any]]) -> None: """Put writes for a task, to be read by the next tick.""" if not writes: return + # deduplicate writes to special channels, last write wins + if all(w[0] in WRITES_IDX_MAP for w in writes): + writes = list({w[0]: w for w in writes}.values()) # save writes - self.checkpoint_pending_writes.extend((task_id, k, v) for k, v in writes) + for c, v in writes: + if ( + c in WRITES_IDX_MAP + and ( + idx := next( + ( + i + for i, w in enumerate(self.checkpoint_pending_writes) + if w[0] == task_id and w[1] == c + ), + None, + ) + ) + is not None + ): + self.checkpoint_pending_writes[idx] = (task_id, c, v) + else: + self.checkpoint_pending_writes.append((task_id, c, v)) if self.checkpointer_put_writes is not None: self.submit( self.checkpointer_put_writes, @@ -536,7 +557,7 @@ def _first(self, *, input_keys: Union[str, Sequence[str]]) -> None: elif isinstance(self.input, Command): writes: defaultdict[str, list[tuple[str, Any]]] = defaultdict(list) # group writes by task ID - for tid, c, v in map_command(self.input): + for tid, c, v in map_command(self.input, self.checkpoint_pending_writes): writes[tid].append((c, v)) if not writes: raise EmptyInputError("Received empty Command input") diff --git a/libs/langgraph/langgraph/pregel/runner.py b/libs/langgraph/langgraph/pregel/runner.py index 9e3879b0f..f46210459 100644 --- a/libs/langgraph/langgraph/pregel/runner.py +++ b/libs/langgraph/langgraph/pregel/runner.py @@ -21,6 +21,7 @@ INTERRUPT, NO_WRITES, PUSH, + RESUME, TAG_HIDDEN, ) from langgraph.errors import GraphBubbleUp, GraphInterrupt @@ -297,6 +298,8 @@ def commit( if isinstance(exception, GraphInterrupt): # save interrupt to checkpointer if interrupts := [(INTERRUPT, i) for i in exception.args[0]]: + if resumes := [w for w in task.writes if w[0] == RESUME]: + interrupts.extend(resumes) self.put_writes(task.id, interrupts) elif isinstance(exception, GraphBubbleUp): raise exception diff --git a/libs/langgraph/langgraph/types.py b/libs/langgraph/langgraph/types.py index 7bf9148c5..4047a28f7 100644 --- a/libs/langgraph/langgraph/types.py +++ b/libs/langgraph/langgraph/types.py @@ -13,6 +13,7 @@ Optional, Sequence, Type, + TypedDict, TypeVar, Union, cast, @@ -21,11 +22,16 @@ from langchain_core.runnables import Runnable, RunnableConfig from typing_extensions import Self -from langgraph.checkpoint.base import BaseCheckpointSaver, CheckpointMetadata +from langgraph.checkpoint.base import ( + BaseCheckpointSaver, + CheckpointMetadata, + PendingWrite, +) if TYPE_CHECKING: from langgraph.store.base import BaseStore + All = Literal["*"] """Special value to indicate that graph should interrupt on all nodes.""" @@ -300,26 +306,60 @@ def __init__( self.stop = stop +class PregelScratchpad(TypedDict, total=False): + interrupt_counter: int + used_null_resume: bool + resume: list[Any] + + def interrupt(value: Any) -> Any: from langgraph.constants import ( CONFIG_KEY_CHECKPOINT_NS, - CONFIG_KEY_RESUME_VALUE, - MISSING, + CONFIG_KEY_SCRATCHPAD, + CONFIG_KEY_SEND, + CONFIG_KEY_TASK_ID, + CONFIG_KEY_WRITES, NS_SEP, + NULL_TASK_ID, + RESUME, ) from langgraph.errors import GraphInterrupt from langgraph.utils.config import get_configurable conf = get_configurable() - if (resume := conf.get(CONFIG_KEY_RESUME_VALUE, MISSING)) and resume is not MISSING: - return resume + # track interrupt index + scratchpad: PregelScratchpad = conf[CONFIG_KEY_SCRATCHPAD] + if "interrupt_counter" not in scratchpad: + scratchpad["interrupt_counter"] = 0 else: - raise GraphInterrupt( - ( - Interrupt( - value=value, - resumable=True, - ns=cast(str, conf[CONFIG_KEY_CHECKPOINT_NS]).split(NS_SEP), - ), - ) + scratchpad["interrupt_counter"] += 1 + idx = scratchpad["interrupt_counter"] + # find previous resume values + task_id = conf[CONFIG_KEY_TASK_ID] + writes: list[PendingWrite] = conf[CONFIG_KEY_WRITES] + scratchpad.setdefault( + "resume", next((w[2] for w in writes if w[0] == task_id and w[1] == RESUME), []) + ) + if scratchpad["resume"]: + if idx < len(scratchpad["resume"]): + return scratchpad["resume"][idx] + # find current resume value + if not scratchpad.get("used_null_resume"): + scratchpad["used_null_resume"] = True + for tid, c, v in sorted(writes, key=lambda x: x[0], reverse=True): + if tid == NULL_TASK_ID and c == RESUME: + assert len(scratchpad["resume"]) == idx, (scratchpad["resume"], idx) + scratchpad["resume"].append(v) + print("saving:", scratchpad["resume"]) + conf[CONFIG_KEY_SEND]([(RESUME, scratchpad["resume"])]) + return v + # no resume value found + raise GraphInterrupt( + ( + Interrupt( + value=value, + resumable=True, + ns=cast(str, conf[CONFIG_KEY_CHECKPOINT_NS]).split(NS_SEP), + ), ) + ) diff --git a/libs/langgraph/tests/test_pregel.py b/libs/langgraph/tests/test_pregel.py index 4ff56ca3e..4f768badd 100644 --- a/libs/langgraph/tests/test_pregel.py +++ b/libs/langgraph/tests/test_pregel.py @@ -14680,3 +14680,143 @@ def bar(state): assert graph.invoke({"baz": ""}, thread1) # Resume with answer assert graph.invoke(Command(resume="bar"), thread1) + + +@pytest.mark.parametrize("checkpointer_name", ALL_CHECKPOINTERS_SYNC) +def test_interrupt_multiple(request: pytest.FixtureRequest, checkpointer_name: str): + checkpointer = request.getfixturevalue(f"checkpointer_{checkpointer_name}") + + class State(TypedDict): + my_key: Annotated[str, operator.add] + + def node(s: State) -> State: + answer = interrupt({"value": 1}) + answer2 = interrupt({"value": 2}) + return {"my_key": answer + " " + answer2} + + builder = StateGraph(State) + builder.add_node("node", node) + builder.add_edge(START, "node") + + graph = builder.compile(checkpointer=checkpointer) + thread1 = {"configurable": {"thread_id": "1"}} + + assert [e for e in graph.stream({"my_key": "DE", "market": "DE"}, thread1)] == [ + { + "__interrupt__": ( + Interrupt( + value={"value": 1}, + resumable=True, + ns=[AnyStr("node:")], + when="during", + ), + ) + } + ] + + assert [ + event + for event in graph.stream( + Command(resume="answer 1", update={"my_key": "foofoo"}), thread1 + ) + ] == [ + { + "__interrupt__": ( + Interrupt( + value={"value": 2}, + resumable=True, + ns=[AnyStr("node:")], + when="during", + ), + ) + } + ] + + assert [event for event in graph.stream(Command(resume="answer 2"), thread1)] == [ + {"node": {"my_key": "answer 1 answer 2"}}, + ] + + +@pytest.mark.parametrize("checkpointer_name", ALL_CHECKPOINTERS_SYNC) +def test_interrupt_loop(request: pytest.FixtureRequest, checkpointer_name: str): + checkpointer = request.getfixturevalue(f"checkpointer_{checkpointer_name}") + + class State(TypedDict): + age: int + other: str + + def ask_age(s: State): + """Ask an expert for help.""" + question = "How old are you?" + value = None + for _ in range(10): + value: str = interrupt(question) + if not value.isdigit() or int(value) < 18: + question = "invalid response" + value = None + else: + break + + return {"age": int(value)} + + builder = StateGraph(State) + builder.add_node("node", ask_age) + builder.add_edge(START, "node") + + graph = builder.compile(checkpointer=checkpointer) + thread1 = {"configurable": {"thread_id": "1"}} + + assert [e for e in graph.stream({"other": ""}, thread1)] == [ + { + "__interrupt__": ( + Interrupt( + value="How old are you?", + resumable=True, + ns=[AnyStr("node:")], + when="during", + ), + ) + } + ] + + assert [ + event + for event in graph.stream( + Command(resume="13"), + thread1, + ) + ] == [ + { + "__interrupt__": ( + Interrupt( + value="invalid response", + resumable=True, + ns=[AnyStr("node:")], + when="during", + ), + ) + } + ] + + assert [ + event + for event in graph.stream( + Command(resume="15"), + thread1, + ) + ] == [ + { + "__interrupt__": ( + Interrupt( + value="invalid response", + resumable=True, + ns=[AnyStr("node:")], + when="during", + ), + ) + } + ] + + assert [event for event in graph.stream(Command(resume="19"), thread1)] == [ + {"node": {"age": 19}}, + ] diff --git a/libs/langgraph/tests/test_pregel_async.py b/libs/langgraph/tests/test_pregel_async.py index cc244c363..addb18b2b 100644 --- a/libs/langgraph/tests/test_pregel_async.py +++ b/libs/langgraph/tests/test_pregel_async.py @@ -12896,3 +12896,160 @@ def bar(state): assert await graph.ainvoke({"baz": ""}, thread1) # Resume with answer assert await graph.ainvoke(Command(resume="bar"), thread1) + + +@pytest.mark.skipif( + sys.version_info < (3, 11), + reason="Python 3.11+ is required for async contextvars support", +) +@pytest.mark.parametrize("checkpointer_name", ALL_CHECKPOINTERS_ASYNC) +async def test_interrupt_multiple(checkpointer_name: str): + class State(TypedDict): + my_key: Annotated[str, operator.add] + + async def node(s: State) -> State: + answer = interrupt({"value": 1}) + answer2 = interrupt({"value": 2}) + return {"my_key": answer + " " + answer2} + + builder = StateGraph(State) + builder.add_node("node", node) + builder.add_edge(START, "node") + + async with awith_checkpointer(checkpointer_name) as checkpointer: + graph = builder.compile(checkpointer=checkpointer) + thread1 = {"configurable": {"thread_id": "1"}} + + assert [ + e async for e in graph.astream({"my_key": "DE", "market": "DE"}, thread1) + ] == [ + { + "__interrupt__": ( + Interrupt( + value={"value": 1}, + resumable=True, + ns=[AnyStr("node:")], + when="during", + ), + ) + } + ] + + assert [ + event + async for event in graph.astream( + Command(resume="answer 1", update={"my_key": "foofoo"}), + thread1, + stream_mode="updates", + ) + ] == [ + { + "__interrupt__": ( + Interrupt( + value={"value": 2}, + resumable=True, + ns=[AnyStr("node:")], + when="during", + ), + ) + } + ] + + assert [ + event + async for event in graph.astream( + Command(resume="answer 2"), thread1, stream_mode="updates" + ) + ] == [ + {"node": {"my_key": "answer 1 answer 2"}}, + ] + + +@pytest.mark.skipif( + sys.version_info < (3, 11), + reason="Python 3.11+ is required for async contextvars support", +) +@pytest.mark.parametrize("checkpointer_name", ALL_CHECKPOINTERS_ASYNC) +async def test_interrupt_loop(checkpointer_name: str): + class State(TypedDict): + age: int + other: str + + async def ask_age(s: State): + """Ask an expert for help.""" + question = "How old are you?" + value = None + for _ in range(10): + value: str = interrupt(question) + if not value.isdigit() or int(value) < 18: + question = "invalid response" + value = None + else: + break + + return {"age": int(value)} + + builder = StateGraph(State) + builder.add_node("node", ask_age) + builder.add_edge(START, "node") + + async with awith_checkpointer(checkpointer_name) as checkpointer: + graph = builder.compile(checkpointer=checkpointer) + thread1 = {"configurable": {"thread_id": "1"}} + + assert [e async for e in graph.astream({"other": ""}, thread1)] == [ + { + "__interrupt__": ( + Interrupt( + value="How old are you?", + resumable=True, + ns=[AnyStr("node:")], + when="during", + ), + ) + } + ] + + assert [ + event + async for event in graph.astream( + Command(resume="13"), + thread1, + ) + ] == [ + { + "__interrupt__": ( + Interrupt( + value="invalid response", + resumable=True, + ns=[AnyStr("node:")], + when="during", + ), + ) + } + ] + + assert [ + event + async for event in graph.astream( + Command(resume="15"), + thread1, + ) + ] == [ + { + "__interrupt__": ( + Interrupt( + value="invalid response", + resumable=True, + ns=[AnyStr("node:")], + when="during", + ), + ) + } + ] + + assert [ + event async for event in graph.astream(Command(resume="19"), thread1) + ] == [ + {"node": {"age": 19}}, + ] diff --git a/libs/scheduler-kafka/tests/any.py b/libs/scheduler-kafka/tests/any.py index 73744a1e8..3ea224173 100644 --- a/libs/scheduler-kafka/tests/any.py +++ b/libs/scheduler-kafka/tests/any.py @@ -35,3 +35,21 @@ def __eq__(self, other: object) -> bool: return False else: return True + + +class AnyList(list): + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + + def __eq__(self, other: object) -> bool: + if not self and isinstance(other, list): + return True + if not isinstance(other, list) or len(self) != len(other): + return False + for i, v in enumerate(self): + if v == other[i]: + continue + else: + return False + else: + return True diff --git a/libs/scheduler-kafka/tests/test_subgraph.py b/libs/scheduler-kafka/tests/test_subgraph.py index ebaaea580..4ab92676c 100644 --- a/libs/scheduler-kafka/tests/test_subgraph.py +++ b/libs/scheduler-kafka/tests/test_subgraph.py @@ -15,7 +15,7 @@ from langgraph.pregel import Pregel from langgraph.scheduler.kafka import serde from langgraph.scheduler.kafka.types import MessageToOrchestrator, Topics -from tests.any import AnyDict +from tests.any import AnyDict, AnyList from tests.drain import drain_topics_async from tests.messages import _AnyIdAIMessage, _AnyIdHumanMessage @@ -196,7 +196,8 @@ async def test_subgraph_w_interrupt( "__pregel_resuming": False, "__pregel_store": None, "__pregel_task_id": history[0].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": None, "checkpoint_map": { "": history[0].config["configurable"]["checkpoint_id"] @@ -261,7 +262,8 @@ async def test_subgraph_w_interrupt( "__pregel_resuming": False, "__pregel_store": None, "__pregel_task_id": history[0].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": c.config["configurable"]["checkpoint_id"], "checkpoint_map": { "": history[0].config["configurable"]["checkpoint_id"] @@ -356,7 +358,8 @@ async def test_subgraph_w_interrupt( "__pregel_resuming": False, "__pregel_store": None, "__pregel_task_id": history[0].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": c.config["configurable"]["checkpoint_id"], "checkpoint_map": { "": history[0].config["configurable"]["checkpoint_id"] @@ -461,7 +464,8 @@ async def test_subgraph_w_interrupt( "__pregel_resuming": True, "__pregel_store": None, "__pregel_task_id": history[1].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": None, "checkpoint_map": { "": history[1].config["configurable"]["checkpoint_id"] @@ -521,7 +525,8 @@ async def test_subgraph_w_interrupt( "__pregel_resuming": True, "__pregel_store": None, "__pregel_task_id": history[1].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": c.config["configurable"]["checkpoint_id"], "checkpoint_map": { "": history[1].config["configurable"]["checkpoint_id"] @@ -637,7 +642,8 @@ async def test_subgraph_w_interrupt( "__pregel_resuming": True, "__pregel_store": None, "__pregel_task_id": history[1].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": c.config["configurable"]["checkpoint_id"], "checkpoint_map": { "": history[1].config["configurable"]["checkpoint_id"] diff --git a/libs/scheduler-kafka/tests/test_subgraph_sync.py b/libs/scheduler-kafka/tests/test_subgraph_sync.py index 75b9d6e73..5fa43998a 100644 --- a/libs/scheduler-kafka/tests/test_subgraph_sync.py +++ b/libs/scheduler-kafka/tests/test_subgraph_sync.py @@ -15,7 +15,7 @@ from langgraph.scheduler.kafka import serde from langgraph.scheduler.kafka.default_sync import DefaultProducer from langgraph.scheduler.kafka.types import MessageToOrchestrator, Topics -from tests.any import AnyDict +from tests.any import AnyDict, AnyList from tests.drain import drain_topics from tests.messages import _AnyIdAIMessage, _AnyIdHumanMessage @@ -195,7 +195,8 @@ def test_subgraph_w_interrupt( "__pregel_resuming": False, "__pregel_store": None, "__pregel_task_id": history[0].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": None, "checkpoint_map": { "": history[0].config["configurable"]["checkpoint_id"] @@ -260,7 +261,8 @@ def test_subgraph_w_interrupt( "__pregel_dedupe_tasks": True, "__pregel_resuming": False, "__pregel_task_id": history[0].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": c.config["configurable"]["checkpoint_id"], "checkpoint_map": { "": history[0].config["configurable"]["checkpoint_id"] @@ -355,7 +357,8 @@ def test_subgraph_w_interrupt( "__pregel_store": None, "__pregel_resuming": False, "__pregel_task_id": history[0].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": c.config["configurable"]["checkpoint_id"], "checkpoint_map": { "": history[0].config["configurable"]["checkpoint_id"] @@ -459,7 +462,8 @@ def test_subgraph_w_interrupt( "__pregel_store": None, "__pregel_resuming": True, "__pregel_task_id": history[1].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": None, "checkpoint_map": { "": history[1].config["configurable"]["checkpoint_id"] @@ -519,7 +523,8 @@ def test_subgraph_w_interrupt( "__pregel_store": None, "__pregel_resuming": True, "__pregel_task_id": history[1].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": c.config["configurable"]["checkpoint_id"], "checkpoint_map": { "": history[1].config["configurable"]["checkpoint_id"] @@ -635,7 +640,8 @@ def test_subgraph_w_interrupt( "__pregel_resuming": True, "__pregel_store": None, "__pregel_task_id": history[1].tasks[0].id, - "__pregel_resume_value": None, + "__pregel_scratchpad": {}, + "__pregel_writes": AnyList(), "checkpoint_id": c.config["configurable"]["checkpoint_id"], "checkpoint_map": { "": history[1].config["configurable"]["checkpoint_id"]