diff --git a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py index 57971e175..9ed72cd02 100644 --- a/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py +++ b/libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py @@ -358,20 +358,23 @@ def attempt(self, msg: MessageToOrchestrator) -> None: if new_tasks := [ t for t in loop.tasks.values() if not t.scheduled and not t.writes ]: + config = patch_configurable( + loop.config, + { + **loop.checkpoint_config["configurable"], + CONFIG_KEY_DEDUPE_TASKS: True, + CONFIG_KEY_ENSURE_LATEST: True, + }, + ) + if CONFIG_KEY_SCRATCHPAD in config[CONF]: + config[CONF][CONFIG_KEY_SCRATCHPAD]["subgraph_counter"] = 0 # send messages to executor futures = [ self.producer.send( self.topics.executor, value=serde.dumps( MessageToExecutor( - config=patch_configurable( - loop.config, - { - **loop.checkpoint_config["configurable"], - CONFIG_KEY_DEDUPE_TASKS: True, - CONFIG_KEY_ENSURE_LATEST: True, - }, - ), + config=config, task=ExecutorTask(id=task.id, path=task.path), finally_send=msg.get("finally_send"), )