diff --git a/libs/langgraph/langgraph/pregel/algo.py b/libs/langgraph/langgraph/pregel/algo.py index 205793ab4..39bc9462d 100644 --- a/libs/langgraph/langgraph/pregel/algo.py +++ b/libs/langgraph/langgraph/pregel/algo.py @@ -680,7 +680,7 @@ def prepare_single_task( "langgraph_checkpoint_ns": task_checkpoint_ns, } if task_id_checksum is not None: - assert task_id == task_id_checksum + assert task_id == task_id_checksum, f"{task_id} != {task_id_checksum}" if for_execution: if node := proc.node: if proc.metadata: diff --git a/libs/langgraph/langgraph/pregel/loop.py b/libs/langgraph/langgraph/pregel/loop.py index 0afe62658..1f2fe91a3 100644 --- a/libs/langgraph/langgraph/pregel/loop.py +++ b/libs/langgraph/langgraph/pregel/loop.py @@ -229,7 +229,7 @@ def __init__( if self.stream is not None and CONFIG_KEY_STREAM in config[CONF]: self.stream = DuplexStream(self.stream, config[CONF][CONFIG_KEY_STREAM]) scratchpad: Optional[PregelScratchpad] = config[CONF].get(CONFIG_KEY_SCRATCHPAD) - if scratchpad is not None: + if not self.config[CONF].get(CONFIG_KEY_DELEGATE) and scratchpad is not None: if scratchpad["subgraph_counter"]: self.config = patch_configurable( self.config, diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py index 5527ec964..57971e175 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py @@ -13,8 +13,10 @@ import langgraph.scheduler.kafka.serde as serde from langgraph.constants import ( + CONF, CONFIG_KEY_DEDUPE_TASKS, CONFIG_KEY_ENSURE_LATEST, + CONFIG_KEY_SCRATCHPAD, INTERRUPT, SCHEDULED, ) @@ -168,6 +170,16 @@ async def attempt(self, msg: MessageToOrchestrator) -> None: if new_tasks := [ t for t in loop.tasks.values() if not t.scheduled and not t.writes ]: + config = patch_configurable( + loop.config, + { + **loop.checkpoint_config["configurable"], + CONFIG_KEY_DEDUPE_TASKS: True, + CONFIG_KEY_ENSURE_LATEST: True, + }, + ) + if CONFIG_KEY_SCRATCHPAD in config[CONF]: + config[CONF][CONFIG_KEY_SCRATCHPAD]["subgraph_counter"] = 0 # send messages to executor futures = await asyncio.gather( *( @@ -175,16 +187,7 @@ async def attempt(self, msg: MessageToOrchestrator) -> None: self.topics.executor, value=serde.dumps( MessageToExecutor( - config=patch_configurable( - loop.config, - { - **loop.checkpoint_config[ - "configurable" - ], - CONFIG_KEY_DEDUPE_TASKS: True, - CONFIG_KEY_ENSURE_LATEST: True, - }, - ), + config=config, task=ExecutorTask(id=task.id, path=task.path), finally_send=msg.get("finally_send"), ) diff --git a/libs/scheduler-kafka/tests/any.py b/libs/scheduler-kafka/tests/any.py index 3ea224173..0336d8506 100644 --- a/libs/scheduler-kafka/tests/any.py +++ b/libs/scheduler-kafka/tests/any.py @@ -53,3 +53,11 @@ def __eq__(self, other: object) -> bool: return False else: return True + + +class AnyInt(int): + def __init__(self) -> None: + super().__init__() + + def __eq__(self, other: object) -> bool: + return isinstance(other, int) diff --git a/libs/scheduler-kafka/tests/test_subgraph.py b/libs/scheduler-kafka/tests/test_subgraph.py index 55303cba0..053eaedd0 100644 --- a/libs/scheduler-kafka/tests/test_subgraph.py +++ b/libs/scheduler-kafka/tests/test_subgraph.py @@ -15,7 +15,7 @@ from langgraph.pregel import Pregel from langgraph.scheduler.kafka import serde from langgraph.scheduler.kafka.types import MessageToOrchestrator, Topics -from tests.any import AnyDict +from tests.any import AnyDict, AnyInt from tests.drain import drain_topics_async from tests.messages import _AnyIdAIMessage, _AnyIdHumanMessage @@ -198,6 +198,7 @@ async def test_subgraph_w_interrupt( "__pregel_store": None, "__pregel_task_id": history[0].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None, @@ -269,6 +270,7 @@ async def test_subgraph_w_interrupt( "__pregel_store": None, "__pregel_task_id": history[0].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None, @@ -370,6 +372,7 @@ async def test_subgraph_w_interrupt( "__pregel_store": None, "__pregel_task_id": history[0].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None, @@ -481,6 +484,7 @@ async def test_subgraph_w_interrupt( "__pregel_store": None, "__pregel_task_id": history[1].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None, @@ -547,6 +551,7 @@ async def test_subgraph_w_interrupt( "__pregel_store": None, "__pregel_task_id": history[1].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None, @@ -669,6 +674,7 @@ async def test_subgraph_w_interrupt( "__pregel_store": None, "__pregel_task_id": history[1].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None, diff --git a/libs/scheduler-kafka/tests/test_subgraph_sync.py b/libs/scheduler-kafka/tests/test_subgraph_sync.py index a67919dda..7d5de920c 100644 --- a/libs/scheduler-kafka/tests/test_subgraph_sync.py +++ b/libs/scheduler-kafka/tests/test_subgraph_sync.py @@ -15,7 +15,7 @@ from langgraph.scheduler.kafka import serde from langgraph.scheduler.kafka.default_sync import DefaultProducer from langgraph.scheduler.kafka.types import MessageToOrchestrator, Topics -from tests.any import AnyDict +from tests.any import AnyDict, AnyInt from tests.drain import drain_topics from tests.messages import _AnyIdAIMessage, _AnyIdHumanMessage @@ -197,6 +197,7 @@ def test_subgraph_w_interrupt( "__pregel_store": None, "__pregel_task_id": history[0].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None, @@ -268,6 +269,7 @@ def test_subgraph_w_interrupt( "__pregel_resuming": False, "__pregel_task_id": history[0].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None, @@ -369,6 +371,7 @@ def test_subgraph_w_interrupt( "__pregel_resuming": False, "__pregel_task_id": history[0].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None, @@ -479,6 +482,7 @@ def test_subgraph_w_interrupt( "__pregel_resuming": True, "__pregel_task_id": history[1].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None, @@ -545,6 +549,7 @@ def test_subgraph_w_interrupt( "__pregel_resuming": True, "__pregel_task_id": history[1].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None, @@ -667,6 +672,7 @@ def test_subgraph_w_interrupt( "__pregel_store": None, "__pregel_task_id": history[1].tasks[0].id, "__pregel_scratchpad": { + "subgraph_counter": AnyInt(), "call_counter": 0, "interrupt_counter": -1, "null_resume": None,