Skip to content

Commit

Permalink
Fix sync
Browse files Browse the repository at this point in the history
  • Loading branch information
nfcampos committed Jan 16, 2025
1 parent c26b0e7 commit 6b93698
Showing 1 changed file with 11 additions and 8 deletions.
19 changes: 11 additions & 8 deletions libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,20 +358,23 @@ def attempt(self, msg: MessageToOrchestrator) -> None:
if new_tasks := [
t for t in loop.tasks.values() if not t.scheduled and not t.writes
]:
config = patch_configurable(
loop.config,
{
**loop.checkpoint_config["configurable"],
CONFIG_KEY_DEDUPE_TASKS: True,
CONFIG_KEY_ENSURE_LATEST: True,
},
)
if CONFIG_KEY_SCRATCHPAD in config[CONF]:
config[CONF][CONFIG_KEY_SCRATCHPAD]["subgraph_counter"] = 0
# send messages to executor
futures = [
self.producer.send(
self.topics.executor,
value=serde.dumps(
MessageToExecutor(
config=patch_configurable(
loop.config,
{
**loop.checkpoint_config["configurable"],
CONFIG_KEY_DEDUPE_TASKS: True,
CONFIG_KEY_ENSURE_LATEST: True,
},
),
config=config,
task=ExecutorTask(id=task.id, path=task.path),
finally_send=msg.get("finally_send"),
)
Expand Down

0 comments on commit 6b93698

Please sign in to comment.