Skip to content

Commit

Permalink
Fix kafka lib
Browse files Browse the repository at this point in the history
  • Loading branch information
nfcampos committed Jan 16, 2025
1 parent f4bd023 commit c26b0e7
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 14 deletions.
2 changes: 1 addition & 1 deletion libs/langgraph/langgraph/pregel/algo.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ def prepare_single_task(
"langgraph_checkpoint_ns": task_checkpoint_ns,
}
if task_id_checksum is not None:
assert task_id == task_id_checksum
assert task_id == task_id_checksum, f"{task_id} != {task_id_checksum}"
if for_execution:
if node := proc.node:
if proc.metadata:
Expand Down
2 changes: 1 addition & 1 deletion libs/langgraph/langgraph/pregel/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ def __init__(
if self.stream is not None and CONFIG_KEY_STREAM in config[CONF]:
self.stream = DuplexStream(self.stream, config[CONF][CONFIG_KEY_STREAM])
scratchpad: Optional[PregelScratchpad] = config[CONF].get(CONFIG_KEY_SCRATCHPAD)
if scratchpad is not None:
if not self.config[CONF].get(CONFIG_KEY_DELEGATE) and scratchpad is not None:
if scratchpad["subgraph_counter"]:
self.config = patch_configurable(
self.config,
Expand Down
23 changes: 13 additions & 10 deletions libs/scheduler-kafka/langgraph/scheduler/kafka/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

import langgraph.scheduler.kafka.serde as serde
from langgraph.constants import (
CONF,
CONFIG_KEY_DEDUPE_TASKS,
CONFIG_KEY_ENSURE_LATEST,
CONFIG_KEY_SCRATCHPAD,
INTERRUPT,
SCHEDULED,
)
Expand Down Expand Up @@ -168,23 +170,24 @@ async def attempt(self, msg: MessageToOrchestrator) -> None:
if new_tasks := [
t for t in loop.tasks.values() if not t.scheduled and not t.writes
]:
config = patch_configurable(
loop.config,
{
**loop.checkpoint_config["configurable"],
CONFIG_KEY_DEDUPE_TASKS: True,
CONFIG_KEY_ENSURE_LATEST: True,
},
)
if CONFIG_KEY_SCRATCHPAD in config[CONF]:
config[CONF][CONFIG_KEY_SCRATCHPAD]["subgraph_counter"] = 0
# send messages to executor
futures = await asyncio.gather(
*(
self.producer.send(
self.topics.executor,
value=serde.dumps(
MessageToExecutor(
config=patch_configurable(
loop.config,
{
**loop.checkpoint_config[
"configurable"
],
CONFIG_KEY_DEDUPE_TASKS: True,
CONFIG_KEY_ENSURE_LATEST: True,
},
),
config=config,
task=ExecutorTask(id=task.id, path=task.path),
finally_send=msg.get("finally_send"),
)
Expand Down
8 changes: 8 additions & 0 deletions libs/scheduler-kafka/tests/any.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,11 @@ def __eq__(self, other: object) -> bool:
return False
else:
return True


class AnyInt(int):
def __init__(self) -> None:
super().__init__()

def __eq__(self, other: object) -> bool:
return isinstance(other, int)
8 changes: 7 additions & 1 deletion libs/scheduler-kafka/tests/test_subgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from langgraph.pregel import Pregel
from langgraph.scheduler.kafka import serde
from langgraph.scheduler.kafka.types import MessageToOrchestrator, Topics
from tests.any import AnyDict
from tests.any import AnyDict, AnyInt
from tests.drain import drain_topics_async
from tests.messages import _AnyIdAIMessage, _AnyIdHumanMessage

Expand Down Expand Up @@ -198,6 +198,7 @@ async def test_subgraph_w_interrupt(
"__pregel_store": None,
"__pregel_task_id": history[0].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down Expand Up @@ -269,6 +270,7 @@ async def test_subgraph_w_interrupt(
"__pregel_store": None,
"__pregel_task_id": history[0].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down Expand Up @@ -370,6 +372,7 @@ async def test_subgraph_w_interrupt(
"__pregel_store": None,
"__pregel_task_id": history[0].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down Expand Up @@ -481,6 +484,7 @@ async def test_subgraph_w_interrupt(
"__pregel_store": None,
"__pregel_task_id": history[1].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down Expand Up @@ -547,6 +551,7 @@ async def test_subgraph_w_interrupt(
"__pregel_store": None,
"__pregel_task_id": history[1].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down Expand Up @@ -669,6 +674,7 @@ async def test_subgraph_w_interrupt(
"__pregel_store": None,
"__pregel_task_id": history[1].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down
8 changes: 7 additions & 1 deletion libs/scheduler-kafka/tests/test_subgraph_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from langgraph.scheduler.kafka import serde
from langgraph.scheduler.kafka.default_sync import DefaultProducer
from langgraph.scheduler.kafka.types import MessageToOrchestrator, Topics
from tests.any import AnyDict
from tests.any import AnyDict, AnyInt
from tests.drain import drain_topics
from tests.messages import _AnyIdAIMessage, _AnyIdHumanMessage

Expand Down Expand Up @@ -197,6 +197,7 @@ def test_subgraph_w_interrupt(
"__pregel_store": None,
"__pregel_task_id": history[0].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down Expand Up @@ -268,6 +269,7 @@ def test_subgraph_w_interrupt(
"__pregel_resuming": False,
"__pregel_task_id": history[0].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down Expand Up @@ -369,6 +371,7 @@ def test_subgraph_w_interrupt(
"__pregel_resuming": False,
"__pregel_task_id": history[0].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down Expand Up @@ -479,6 +482,7 @@ def test_subgraph_w_interrupt(
"__pregel_resuming": True,
"__pregel_task_id": history[1].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down Expand Up @@ -545,6 +549,7 @@ def test_subgraph_w_interrupt(
"__pregel_resuming": True,
"__pregel_task_id": history[1].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down Expand Up @@ -667,6 +672,7 @@ def test_subgraph_w_interrupt(
"__pregel_store": None,
"__pregel_task_id": history[1].tasks[0].id,
"__pregel_scratchpad": {
"subgraph_counter": AnyInt(),
"call_counter": 0,
"interrupt_counter": -1,
"null_resume": None,
Expand Down

0 comments on commit c26b0e7

Please sign in to comment.