From 8db2fc699e7f2c555afbbf64df8ef5d7452eea5b Mon Sep 17 00:00:00 2001 From: vbarda Date: Thu, 19 Dec 2024 17:21:57 -0500 Subject: [PATCH] add async --- .../langgraph/checkpoint/postgres/aio.py | 3 +- .../langgraph/checkpoint/postgres/shallow.py | 493 ++++++- libs/checkpoint-postgres/tests/test_async.py | 39 +- .../tests/__snapshots__/test_large_cases.ambr | 40 +- .../__snapshots__/test_large_cases_async.ambr | 151 +++ .../tests/__snapshots__/test_pregel.ambr | 16 +- .../__snapshots__/test_pregel_async.ambr | 136 ++ libs/langgraph/tests/conftest.py | 44 +- libs/langgraph/tests/test_large_cases.py | 12 +- .../langgraph/tests/test_large_cases_async.py | 1142 +++++++++++------ libs/langgraph/tests/test_pregel.py | 17 +- libs/langgraph/tests/test_pregel_async.py | 185 ++- 12 files changed, 1741 insertions(+), 537 deletions(-) create mode 100644 libs/langgraph/tests/__snapshots__/test_large_cases_async.ambr diff --git a/libs/checkpoint-postgres/langgraph/checkpoint/postgres/aio.py b/libs/checkpoint-postgres/langgraph/checkpoint/postgres/aio.py index 3a1e13db1..a5a82b185 100644 --- a/libs/checkpoint-postgres/langgraph/checkpoint/postgres/aio.py +++ b/libs/checkpoint-postgres/langgraph/checkpoint/postgres/aio.py @@ -19,6 +19,7 @@ ) from langgraph.checkpoint.postgres import _ainternal from langgraph.checkpoint.postgres.base import BasePostgresSaver +from langgraph.checkpoint.postgres.shallow import AsyncShallowPostgresSaver from langgraph.checkpoint.serde.base import SerializerProtocol Conn = _ainternal.Conn # For backward compatibility @@ -464,4 +465,4 @@ def put_writes( ).result() -__all__ = ["AsyncPostgresSaver", "Conn"] +__all__ = ["AsyncPostgresSaver", "AsyncShallowPostgresSaver", "Conn"] diff --git a/libs/checkpoint-postgres/langgraph/checkpoint/postgres/shallow.py b/libs/checkpoint-postgres/langgraph/checkpoint/postgres/shallow.py index 305c8e23c..caaccab37 100644 --- a/libs/checkpoint-postgres/langgraph/checkpoint/postgres/shallow.py +++ b/libs/checkpoint-postgres/langgraph/checkpoint/postgres/shallow.py @@ -1,13 +1,22 @@ +import asyncio import threading -from collections.abc import Iterator, Sequence -from contextlib import contextmanager +from collections.abc import AsyncIterator, Iterator, Sequence +from contextlib import asynccontextmanager, contextmanager from typing import Any, Optional from langchain_core.runnables import RunnableConfig -from psycopg import Capabilities, Connection, Cursor, Pipeline +from psycopg import ( + AsyncConnection, + AsyncCursor, + AsyncPipeline, + Capabilities, + Connection, + Cursor, + Pipeline, +) from psycopg.rows import DictRow, dict_row from psycopg.types.json import Jsonb -from psycopg_pool import ConnectionPool +from psycopg_pool import AsyncConnectionPool, ConnectionPool from langgraph.checkpoint.base import ( WRITES_IDX_MAP, @@ -16,7 +25,7 @@ CheckpointMetadata, CheckpointTuple, ) -from langgraph.checkpoint.postgres import _internal +from langgraph.checkpoint.postgres import _ainternal, _internal from langgraph.checkpoint.postgres.base import BasePostgresSaver from langgraph.checkpoint.serde.base import SerializerProtocol from langgraph.checkpoint.serde.types import TASKS @@ -131,6 +140,27 @@ """ +def _dump_blobs( + serde: SerializerProtocol, + thread_id: str, + checkpoint_ns: str, + values: dict[str, Any], + versions: ChannelVersions, +) -> list[tuple[str, str, str, str, str, Optional[bytes]]]: + if not versions: + return [] + + return [ + ( + thread_id, + checkpoint_ns, + k, + *(serde.dumps_typed(values[k]) if k in values else ("empty", None)), + ) + for k in versions + ] + + class ShallowPostgresSaver(BasePostgresSaver): """A checkpoint saver that uses Postgres to store checkpoints. @@ -165,30 +195,6 @@ def __init__( self.lock = threading.Lock() self.supports_pipeline = Capabilities().has_pipeline() - def _dump_blobs( - self, - thread_id: str, - checkpoint_ns: str, - values: dict[str, Any], - versions: ChannelVersions, - ) -> list[tuple[str, str, str, str, str, Optional[bytes]]]: - if not versions: - return [] - - return [ - ( - thread_id, - checkpoint_ns, - k, - *( - self.serde.dumps_typed(values[k]) - if k in values - else ("empty", None) - ), - ) - for k in versions - ] - @classmethod @contextmanager def from_conn_string( @@ -399,7 +405,8 @@ def put( ) cur.executemany( self.UPSERT_CHECKPOINT_BLOBS_SQL, - self._dump_blobs( + _dump_blobs( + self.serde, thread_id, checkpoint_ns, copy.pop("channel_values"), # type: ignore[misc] @@ -490,3 +497,429 @@ def _cursor(self, *, pipeline: bool = False) -> Iterator[Cursor[DictRow]]: else: with self.lock, conn.cursor(binary=True, row_factory=dict_row) as cur: yield cur + + +class AsyncShallowPostgresSaver(BasePostgresSaver): + """A checkpoint saver that uses Postgres to store checkpoints asynchronously. + + This checkpointer ONLY stores the most recent checkpoint and does NOT retain any history. + It is meant to be a light-weight drop-in replacement for the AsyncPostgresSaver that + supports most of the LangGraph persistence functionality with the exception of time travel. + """ + + SELECT_SQL = SELECT_SQL + MIGRATIONS = MIGRATIONS + UPSERT_CHECKPOINT_BLOBS_SQL = UPSERT_CHECKPOINT_BLOBS_SQL + UPSERT_CHECKPOINTS_SQL = UPSERT_CHECKPOINTS_SQL + UPSERT_CHECKPOINT_WRITES_SQL = UPSERT_CHECKPOINT_WRITES_SQL + INSERT_CHECKPOINT_WRITES_SQL = INSERT_CHECKPOINT_WRITES_SQL + lock: asyncio.Lock + + def __init__( + self, + conn: _ainternal.Conn, + pipe: Optional[AsyncPipeline] = None, + serde: Optional[SerializerProtocol] = None, + ) -> None: + super().__init__(serde=serde) + if isinstance(conn, AsyncConnectionPool) and pipe is not None: + raise ValueError( + "Pipeline should be used only with a single AsyncConnection, not AsyncConnectionPool." + ) + + self.conn = conn + self.pipe = pipe + self.lock = asyncio.Lock() + self.loop = asyncio.get_running_loop() + self.supports_pipeline = Capabilities().has_pipeline() + + @classmethod + @asynccontextmanager + async def from_conn_string( + cls, + conn_string: str, + *, + pipeline: bool = False, + serde: Optional[SerializerProtocol] = None, + ) -> AsyncIterator["AsyncShallowPostgresSaver"]: + """Create a new AsyncShallowPostgresSaver instance from a connection string. + + Args: + conn_string (str): The Postgres connection info string. + pipeline (bool): whether to use AsyncPipeline + + Returns: + AsyncShallowPostgresSaver: A new AsyncShallowPostgresSaver instance. + """ + async with await AsyncConnection.connect( + conn_string, autocommit=True, prepare_threshold=0, row_factory=dict_row + ) as conn: + if pipeline: + async with conn.pipeline() as pipe: + yield cls(conn=conn, pipe=pipe, serde=serde) + else: + yield cls(conn=conn, serde=serde) + + async def setup(self) -> None: + """Set up the checkpoint database asynchronously. + + This method creates the necessary tables in the Postgres database if they don't + already exist and runs database migrations. It MUST be called directly by the user + the first time checkpointer is used. + """ + async with self._cursor() as cur: + await cur.execute(self.MIGRATIONS[0]) + results = await cur.execute( + "SELECT v FROM checkpoint_migrations ORDER BY v DESC LIMIT 1" + ) + row = await results.fetchone() + if row is None: + version = -1 + else: + version = row["v"] + for v, migration in zip( + range(version + 1, len(self.MIGRATIONS)), + self.MIGRATIONS[version + 1 :], + ): + await cur.execute(migration) + await cur.execute(f"INSERT INTO checkpoint_migrations (v) VALUES ({v})") + if self.pipe: + await self.pipe.sync() + + async def alist( + self, + config: Optional[RunnableConfig], + *, + filter: Optional[dict[str, Any]] = None, + before: Optional[RunnableConfig] = None, + limit: Optional[int] = None, + ) -> AsyncIterator[CheckpointTuple]: + """List checkpoints from the database asynchronously. + + This method retrieves a list of checkpoint tuples from the Postgres database based + on the provided config. For ShallowPostgresSaver, this method returns a list with + ONLY the most recent checkpoint. + """ + where, args = self._search_where(config, filter, before) + query = self.SELECT_SQL + where + if limit: + query += f" LIMIT {limit}" + async with self._cursor() as cur: + await cur.execute(self.SELECT_SQL + where, args, binary=True) + async for value in cur: + checkpoint = await asyncio.to_thread( + self._load_checkpoint, + value["checkpoint"], + value["channel_values"], + value["pending_sends"], + ) + yield CheckpointTuple( + config={ + "configurable": { + "thread_id": value["thread_id"], + "checkpoint_ns": value["checkpoint_ns"], + "checkpoint_id": checkpoint["id"], + } + }, + checkpoint=checkpoint, + metadata=self._load_metadata(value["metadata"]), + pending_writes=await asyncio.to_thread( + self._load_writes, value["pending_writes"] + ), + ) + + async def aget_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: + """Get a checkpoint tuple from the database asynchronously. + + This method retrieves a checkpoint tuple from the Postgres database based on the + provided config (matching the thread ID in the config). + + Args: + config (RunnableConfig): The config to use for retrieving the checkpoint. + + Returns: + Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found. + """ + thread_id = config["configurable"]["thread_id"] + checkpoint_ns = config["configurable"].get("checkpoint_ns", "") + args = (thread_id, checkpoint_ns) + where = "WHERE thread_id = %s AND checkpoint_ns = %s" + + async with self._cursor() as cur: + await cur.execute( + self.SELECT_SQL + where, + args, + binary=True, + ) + + async for value in cur: + checkpoint = await asyncio.to_thread( + self._load_checkpoint, + value["checkpoint"], + value["channel_values"], + value["pending_sends"], + ) + return CheckpointTuple( + config={ + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint["id"], + } + }, + checkpoint=checkpoint, + metadata=self._load_metadata(value["metadata"]), + pending_writes=await asyncio.to_thread( + self._load_writes, value["pending_writes"] + ), + ) + + async def aput( + self, + config: RunnableConfig, + checkpoint: Checkpoint, + metadata: CheckpointMetadata, + new_versions: ChannelVersions, + ) -> RunnableConfig: + """Save a checkpoint to the database asynchronously. + + This method saves a checkpoint to the Postgres database. The checkpoint is associated + with the provided config and its parent config (if any). + + Args: + config (RunnableConfig): The config to associate with the checkpoint. + checkpoint (Checkpoint): The checkpoint to save. + metadata (CheckpointMetadata): Additional metadata to save with the checkpoint. + new_versions (ChannelVersions): New channel versions as of this write. + + Returns: + RunnableConfig: Updated configuration after storing the checkpoint. + """ + configurable = config["configurable"].copy() + thread_id = configurable.pop("thread_id") + checkpoint_ns = configurable.pop("checkpoint_ns") + + copy = checkpoint.copy() + next_config = { + "configurable": { + "thread_id": thread_id, + "checkpoint_ns": checkpoint_ns, + "checkpoint_id": checkpoint["id"], + } + } + + async with self._cursor(pipeline=True) as cur: + await cur.execute( + """DELETE FROM checkpoint_writes + WHERE thread_id = %s AND checkpoint_ns = %s AND checkpoint_id NOT IN (%s, %s)""", + ( + thread_id, + checkpoint_ns, + checkpoint["id"], + configurable.get("checkpoint_id", ""), + ), + ) + await cur.executemany( + self.UPSERT_CHECKPOINT_BLOBS_SQL, + _dump_blobs( + self.serde, + thread_id, + checkpoint_ns, + copy.pop("channel_values"), # type: ignore[misc] + new_versions, + ), + ) + await cur.execute( + self.UPSERT_CHECKPOINTS_SQL, + ( + thread_id, + checkpoint_ns, + Jsonb(self._dump_checkpoint(copy)), + self._dump_metadata(metadata), + ), + ) + return next_config + + async def aput_writes( + self, + config: RunnableConfig, + writes: Sequence[tuple[str, Any]], + task_id: str, + ) -> None: + """Store intermediate writes linked to a checkpoint asynchronously. + + This method saves intermediate writes associated with a checkpoint to the database. + + Args: + config (RunnableConfig): Configuration of the related checkpoint. + writes (Sequence[Tuple[str, Any]]): List of writes to store, each as (channel, value) pair. + task_id (str): Identifier for the task creating the writes. + """ + query = ( + self.UPSERT_CHECKPOINT_WRITES_SQL + if all(w[0] in WRITES_IDX_MAP for w in writes) + else self.INSERT_CHECKPOINT_WRITES_SQL + ) + params = await asyncio.to_thread( + self._dump_writes, + config["configurable"]["thread_id"], + config["configurable"]["checkpoint_ns"], + config["configurable"]["checkpoint_id"], + task_id, + writes, + ) + async with self._cursor(pipeline=True) as cur: + await cur.executemany(query, params) + + @asynccontextmanager + async def _cursor( + self, *, pipeline: bool = False + ) -> AsyncIterator[AsyncCursor[DictRow]]: + """Create a database cursor as a context manager. + + Args: + pipeline (bool): whether to use pipeline for the DB operations inside the context manager. + Will be applied regardless of whether the AsyncShallowPostgresSaver instance was initialized with a pipeline. + If pipeline mode is not supported, will fall back to using transaction context manager. + """ + async with _ainternal.get_connection(self.conn) as conn: + if self.pipe: + # a connection in pipeline mode can be used concurrently + # in multiple threads/coroutines, but only one cursor can be + # used at a time + try: + async with conn.cursor(binary=True, row_factory=dict_row) as cur: + yield cur + finally: + if pipeline: + await self.pipe.sync() + elif pipeline: + # a connection not in pipeline mode can only be used by one + # thread/coroutine at a time, so we acquire a lock + if self.supports_pipeline: + async with ( + self.lock, + conn.pipeline(), + conn.cursor(binary=True, row_factory=dict_row) as cur, + ): + yield cur + else: + # Use connection's transaction context manager when pipeline mode not supported + async with ( + self.lock, + conn.transaction(), + conn.cursor(binary=True, row_factory=dict_row) as cur, + ): + yield cur + else: + async with ( + self.lock, + conn.cursor(binary=True, row_factory=dict_row) as cur, + ): + yield cur + + def list( + self, + config: Optional[RunnableConfig], + *, + filter: Optional[dict[str, Any]] = None, + before: Optional[RunnableConfig] = None, + limit: Optional[int] = None, + ) -> Iterator[CheckpointTuple]: + """List checkpoints from the database. + + This method retrieves a list of checkpoint tuples from the Postgres database based + on the provided config. The checkpoints are ordered by checkpoint ID in descending order (newest first). + + Args: + config (Optional[RunnableConfig]): Base configuration for filtering checkpoints. + filter (Optional[Dict[str, Any]]): Additional filtering criteria for metadata. + before (Optional[RunnableConfig]): If provided, only checkpoints before the specified checkpoint ID are returned. Defaults to None. + limit (Optional[int]): Maximum number of checkpoints to return. + + Yields: + Iterator[CheckpointTuple]: An iterator of matching checkpoint tuples. + """ + aiter_ = self.alist(config, filter=filter, before=before, limit=limit) + while True: + try: + yield asyncio.run_coroutine_threadsafe( + anext(aiter_), # noqa: F821 + self.loop, + ).result() + except StopAsyncIteration: + break + + def get_tuple(self, config: RunnableConfig) -> Optional[CheckpointTuple]: + """Get a checkpoint tuple from the database. + + This method retrieves a checkpoint tuple from the Postgres database based on the + provided config. If the config contains a "checkpoint_id" key, the checkpoint with + the matching thread ID and "checkpoint_id" is retrieved. Otherwise, the latest checkpoint + for the given thread ID is retrieved. + + Args: + config (RunnableConfig): The config to use for retrieving the checkpoint. + + Returns: + Optional[CheckpointTuple]: The retrieved checkpoint tuple, or None if no matching checkpoint was found. + """ + try: + # check if we are in the main thread, only bg threads can block + # we don't check in other methods to avoid the overhead + if asyncio.get_running_loop() is self.loop: + raise asyncio.InvalidStateError( + "Synchronous calls to AsyncShallowPostgresSaver are only allowed from a " + "different thread. From the main thread, use the async interface." + "For example, use `await checkpointer.aget_tuple(...)` or `await " + "graph.ainvoke(...)`." + ) + except RuntimeError: + pass + return asyncio.run_coroutine_threadsafe( + self.aget_tuple(config), self.loop + ).result() + + def put( + self, + config: RunnableConfig, + checkpoint: Checkpoint, + metadata: CheckpointMetadata, + new_versions: ChannelVersions, + ) -> RunnableConfig: + """Save a checkpoint to the database. + + This method saves a checkpoint to the Postgres database. The checkpoint is associated + with the provided config and its parent config (if any). + + Args: + config (RunnableConfig): The config to associate with the checkpoint. + checkpoint (Checkpoint): The checkpoint to save. + metadata (CheckpointMetadata): Additional metadata to save with the checkpoint. + new_versions (ChannelVersions): New channel versions as of this write. + + Returns: + RunnableConfig: Updated configuration after storing the checkpoint. + """ + return asyncio.run_coroutine_threadsafe( + self.aput(config, checkpoint, metadata, new_versions), self.loop + ).result() + + def put_writes( + self, + config: RunnableConfig, + writes: Sequence[tuple[str, Any]], + task_id: str, + ) -> None: + """Store intermediate writes linked to a checkpoint. + + This method saves intermediate writes associated with a checkpoint to the database. + + Args: + config (RunnableConfig): Configuration of the related checkpoint. + writes (Sequence[Tuple[str, Any]]): List of writes to store, each as (channel, value) pair. + task_id (str): Identifier for the task creating the writes. + """ + return asyncio.run_coroutine_threadsafe( + self.aput_writes(config, writes, task_id), self.loop + ).result() diff --git a/libs/checkpoint-postgres/tests/test_async.py b/libs/checkpoint-postgres/tests/test_async.py index 67848707c..7590012dd 100644 --- a/libs/checkpoint-postgres/tests/test_async.py +++ b/libs/checkpoint-postgres/tests/test_async.py @@ -16,7 +16,10 @@ create_checkpoint, empty_checkpoint, ) -from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver +from langgraph.checkpoint.postgres.aio import ( + AsyncPostgresSaver, + AsyncShallowPostgresSaver, +) from tests.conftest import DEFAULT_POSTGRES_URI @@ -103,11 +106,41 @@ async def _base_saver(): await conn.execute(f"DROP DATABASE {database}") +@asynccontextmanager +async def _shallow_saver(): + """Fixture for shallow connection mode testing.""" + database = f"test_{uuid4().hex[:16]}" + # create unique db + async with await AsyncConnection.connect( + DEFAULT_POSTGRES_URI, autocommit=True + ) as conn: + await conn.execute(f"CREATE DATABASE {database}") + try: + async with await AsyncConnection.connect( + DEFAULT_POSTGRES_URI + database, + autocommit=True, + prepare_threshold=0, + row_factory=dict_row, + ) as conn: + checkpointer = AsyncShallowPostgresSaver(conn) + await checkpointer.setup() + yield checkpointer + finally: + # drop unique db + async with await AsyncConnection.connect( + DEFAULT_POSTGRES_URI, autocommit=True + ) as conn: + await conn.execute(f"DROP DATABASE {database}") + + @asynccontextmanager async def _saver(name: str): if name == "base": async with _base_saver() as saver: yield saver + elif name == "shallow": + async with _shallow_saver() as saver: + yield saver elif name == "pool": async with _pool_saver() as saver: yield saver @@ -167,7 +200,7 @@ def test_data(): } -@pytest.mark.parametrize("saver_name", ["base", "pool", "pipe"]) +@pytest.mark.parametrize("saver_name", ["base", "pool", "pipe", "shallow"]) async def test_asearch(request, saver_name: str, test_data) -> None: async with _saver(saver_name) as saver: configs = test_data["configs"] @@ -212,7 +245,7 @@ async def test_asearch(request, saver_name: str, test_data) -> None: } == {"", "inner"} -@pytest.mark.parametrize("saver_name", ["base", "pool", "pipe"]) +@pytest.mark.parametrize("saver_name", ["base", "pool", "pipe", "shallow"]) async def test_null_chars(request, saver_name: str, test_data) -> None: async with _saver(saver_name) as saver: config = await saver.aput( diff --git a/libs/langgraph/tests/__snapshots__/test_large_cases.ambr b/libs/langgraph/tests/__snapshots__/test_large_cases.ambr index 74a207eb9..4bbc28ffb 100644 --- a/libs/langgraph/tests/__snapshots__/test_large_cases.ambr +++ b/libs/langgraph/tests/__snapshots__/test_large_cases.ambr @@ -169,7 +169,7 @@ ''' # --- -# name: test_branch_then[shallow_postgres] +# name: test_branch_then[postgres_shallow] ''' graph TD; __start__ --> prepare; @@ -181,7 +181,7 @@ ''' # --- -# name: test_branch_then[shallow_postgres].1 +# name: test_branch_then[postgres_shallow].1 ''' %%{init: {'flowchart': {'curve': 'linear'}}}%% graph TD; @@ -1612,7 +1612,7 @@ ''' # --- -# name: test_conditional_graph[shallow_postgres] +# name: test_conditional_graph[postgres_shallow] ''' { "nodes": [ @@ -1683,7 +1683,7 @@ } ''' # --- -# name: test_conditional_graph[shallow_postgres].1 +# name: test_conditional_graph[postgres_shallow].1 ''' graph TD; __start__ --> agent; @@ -1693,7 +1693,7 @@ ''' # --- -# name: test_conditional_graph[shallow_postgres].2 +# name: test_conditional_graph[postgres_shallow].2 ''' %%{init: {'flowchart': {'curve': 'linear'}}}%% graph TD; @@ -1713,7 +1713,7 @@ ''' # --- -# name: test_conditional_graph[shallow_postgres].3 +# name: test_conditional_graph[postgres_shallow].3 ''' { "nodes": [ @@ -1784,7 +1784,7 @@ } ''' # --- -# name: test_conditional_graph[shallow_postgres].4 +# name: test_conditional_graph[postgres_shallow].4 ''' graph TD; __start__ --> agent; @@ -1794,7 +1794,7 @@ ''' # --- -# name: test_conditional_graph[shallow_postgres].5 +# name: test_conditional_graph[postgres_shallow].5 dict({ 'edges': list([ dict({ @@ -1867,7 +1867,7 @@ ]), }) # --- -# name: test_conditional_graph[shallow_postgres].6 +# name: test_conditional_graph[postgres_shallow].6 ''' %%{init: {'flowchart': {'curve': 'linear'}}}%% graph TD; @@ -2572,13 +2572,13 @@ ''' # --- -# name: test_conditional_state_graph[shallow_postgres] +# name: test_conditional_state_graph[postgres_shallow] '{"$defs": {"AgentAction": {"description": "Represents a request to execute an action by an agent.\\n\\nThe action consists of the name of the tool to execute and the input to pass\\nto the tool. The log is used to pass along extra information about the action.", "properties": {"tool": {"title": "Tool", "type": "string"}, "tool_input": {"anyOf": [{"type": "string"}, {"type": "object"}], "title": "Tool Input"}, "log": {"title": "Log", "type": "string"}, "type": {"const": "AgentAction", "default": "AgentAction", "enum": ["AgentAction"], "title": "Type", "type": "string"}}, "required": ["tool", "tool_input", "log"], "title": "AgentAction", "type": "object"}, "AgentFinish": {"description": "Final return value of an ActionAgent.\\n\\nAgents return an AgentFinish when they have reached a stopping condition.", "properties": {"return_values": {"title": "Return Values", "type": "object"}, "log": {"title": "Log", "type": "string"}, "type": {"const": "AgentFinish", "default": "AgentFinish", "enum": ["AgentFinish"], "title": "Type", "type": "string"}}, "required": ["return_values", "log"], "title": "AgentFinish", "type": "object"}}, "properties": {"input": {"default": null, "title": "Input", "type": "string"}, "agent_outcome": {"anyOf": [{"$ref": "#/$defs/AgentAction"}, {"$ref": "#/$defs/AgentFinish"}, {"type": "null"}], "default": null, "title": "Agent Outcome"}, "intermediate_steps": {"default": null, "items": {"maxItems": 2, "minItems": 2, "prefixItems": [{"$ref": "#/$defs/AgentAction"}, {"type": "string"}], "type": "array"}, "title": "Intermediate Steps", "type": "array"}}, "title": "LangGraphInput", "type": "object"}' # --- -# name: test_conditional_state_graph[shallow_postgres].1 +# name: test_conditional_state_graph[postgres_shallow].1 '{"$defs": {"AgentAction": {"description": "Represents a request to execute an action by an agent.\\n\\nThe action consists of the name of the tool to execute and the input to pass\\nto the tool. The log is used to pass along extra information about the action.", "properties": {"tool": {"title": "Tool", "type": "string"}, "tool_input": {"anyOf": [{"type": "string"}, {"type": "object"}], "title": "Tool Input"}, "log": {"title": "Log", "type": "string"}, "type": {"const": "AgentAction", "default": "AgentAction", "enum": ["AgentAction"], "title": "Type", "type": "string"}}, "required": ["tool", "tool_input", "log"], "title": "AgentAction", "type": "object"}, "AgentFinish": {"description": "Final return value of an ActionAgent.\\n\\nAgents return an AgentFinish when they have reached a stopping condition.", "properties": {"return_values": {"title": "Return Values", "type": "object"}, "log": {"title": "Log", "type": "string"}, "type": {"const": "AgentFinish", "default": "AgentFinish", "enum": ["AgentFinish"], "title": "Type", "type": "string"}}, "required": ["return_values", "log"], "title": "AgentFinish", "type": "object"}}, "properties": {"input": {"default": null, "title": "Input", "type": "string"}, "agent_outcome": {"anyOf": [{"$ref": "#/$defs/AgentAction"}, {"$ref": "#/$defs/AgentFinish"}, {"type": "null"}], "default": null, "title": "Agent Outcome"}, "intermediate_steps": {"default": null, "items": {"maxItems": 2, "minItems": 2, "prefixItems": [{"$ref": "#/$defs/AgentAction"}, {"type": "string"}], "type": "array"}, "title": "Intermediate Steps", "type": "array"}}, "title": "LangGraphOutput", "type": "object"}' # --- -# name: test_conditional_state_graph[shallow_postgres].2 +# name: test_conditional_state_graph[postgres_shallow].2 ''' { "nodes": [ @@ -2644,7 +2644,7 @@ } ''' # --- -# name: test_conditional_state_graph[shallow_postgres].3 +# name: test_conditional_state_graph[postgres_shallow].3 ''' graph TD; __start__ --> agent; @@ -3141,13 +3141,13 @@ ''' # --- -# name: test_message_graph[shallow_postgres] +# name: test_message_graph[postgres_shallow] '{"$defs": {"AIMessage": {"additionalProperties": true, "description": "Message from an AI.\\n\\nAIMessage is returned from a chat model as a response to a prompt.\\n\\nThis message represents the output of the model and consists of both\\nthe raw output as returned by the model together standardized fields\\n(e.g., tool calls, usage metadata) added by the LangChain framework.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "ai", "default": "ai", "enum": ["ai"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "example": {"default": false, "title": "Example", "type": "boolean"}, "tool_calls": {"default": [], "items": {"$ref": "#/$defs/ToolCall"}, "title": "Tool Calls", "type": "array"}, "invalid_tool_calls": {"default": [], "items": {"$ref": "#/$defs/InvalidToolCall"}, "title": "Invalid Tool Calls", "type": "array"}, "usage_metadata": {"anyOf": [{"$ref": "#/$defs/UsageMetadata"}, {"type": "null"}], "default": null}}, "required": ["content"], "title": "AIMessage", "type": "object"}, "AIMessageChunk": {"additionalProperties": true, "description": "Message chunk from an AI.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "AIMessageChunk", "default": "AIMessageChunk", "enum": ["AIMessageChunk"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "example": {"default": false, "title": "Example", "type": "boolean"}, "tool_calls": {"default": [], "items": {"$ref": "#/$defs/ToolCall"}, "title": "Tool Calls", "type": "array"}, "invalid_tool_calls": {"default": [], "items": {"$ref": "#/$defs/InvalidToolCall"}, "title": "Invalid Tool Calls", "type": "array"}, "usage_metadata": {"anyOf": [{"$ref": "#/$defs/UsageMetadata"}, {"type": "null"}], "default": null}, "tool_call_chunks": {"default": [], "items": {"$ref": "#/$defs/ToolCallChunk"}, "title": "Tool Call Chunks", "type": "array"}}, "required": ["content"], "title": "AIMessageChunk", "type": "object"}, "ChatMessage": {"additionalProperties": true, "description": "Message that can be assigned an arbitrary speaker (i.e. role).", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "chat", "default": "chat", "enum": ["chat"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "role": {"title": "Role", "type": "string"}}, "required": ["content", "role"], "title": "ChatMessage", "type": "object"}, "ChatMessageChunk": {"additionalProperties": true, "description": "Chat Message chunk.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "ChatMessageChunk", "default": "ChatMessageChunk", "enum": ["ChatMessageChunk"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "role": {"title": "Role", "type": "string"}}, "required": ["content", "role"], "title": "ChatMessageChunk", "type": "object"}, "FunctionMessage": {"additionalProperties": true, "description": "Message for passing the result of executing a tool back to a model.\\n\\nFunctionMessage are an older version of the ToolMessage schema, and\\ndo not contain the tool_call_id field.\\n\\nThe tool_call_id field is used to associate the tool call request with the\\ntool call response. This is useful in situations where a chat model is able\\nto request multiple tool calls in parallel.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "function", "default": "function", "enum": ["function"], "title": "Type", "type": "string"}, "name": {"title": "Name", "type": "string"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}}, "required": ["content", "name"], "title": "FunctionMessage", "type": "object"}, "FunctionMessageChunk": {"additionalProperties": true, "description": "Function Message chunk.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "FunctionMessageChunk", "default": "FunctionMessageChunk", "enum": ["FunctionMessageChunk"], "title": "Type", "type": "string"}, "name": {"title": "Name", "type": "string"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}}, "required": ["content", "name"], "title": "FunctionMessageChunk", "type": "object"}, "HumanMessage": {"additionalProperties": true, "description": "Message from a human.\\n\\nHumanMessages are messages that are passed in from a human to the model.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n from langchain_core.messages import HumanMessage, SystemMessage\\n\\n messages = [\\n SystemMessage(\\n content=\\"You are a helpful assistant! Your name is Bob.\\"\\n ),\\n HumanMessage(\\n content=\\"What is your name?\\"\\n )\\n ]\\n\\n # Instantiate a chat model and invoke it with the messages\\n model = ...\\n print(model.invoke(messages))", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "human", "default": "human", "enum": ["human"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "example": {"default": false, "title": "Example", "type": "boolean"}}, "required": ["content"], "title": "HumanMessage", "type": "object"}, "HumanMessageChunk": {"additionalProperties": true, "description": "Human Message chunk.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "HumanMessageChunk", "default": "HumanMessageChunk", "enum": ["HumanMessageChunk"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "example": {"default": false, "title": "Example", "type": "boolean"}}, "required": ["content"], "title": "HumanMessageChunk", "type": "object"}, "InputTokenDetails": {"description": "Breakdown of input token counts.\\n\\nDoes *not* need to sum to full input token count. Does *not* need to have all keys.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n {\\n \\"audio\\": 10,\\n \\"cache_creation\\": 200,\\n \\"cache_read\\": 100,\\n }\\n\\n.. versionadded:: 0.3.9", "properties": {"audio": {"title": "Audio", "type": "integer"}, "cache_creation": {"title": "Cache Creation", "type": "integer"}, "cache_read": {"title": "Cache Read", "type": "integer"}}, "title": "InputTokenDetails", "type": "object"}, "InvalidToolCall": {"description": "Allowance for errors made by LLM.\\n\\nHere we add an `error` key to surface errors made during generation\\n(e.g., invalid JSON arguments.)", "properties": {"name": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Name"}, "args": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Args"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Id"}, "error": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Error"}, "type": {"const": "invalid_tool_call", "enum": ["invalid_tool_call"], "title": "Type", "type": "string"}}, "required": ["name", "args", "id", "error"], "title": "InvalidToolCall", "type": "object"}, "OutputTokenDetails": {"description": "Breakdown of output token counts.\\n\\nDoes *not* need to sum to full output token count. Does *not* need to have all keys.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n {\\n \\"audio\\": 10,\\n \\"reasoning\\": 200,\\n }\\n\\n.. versionadded:: 0.3.9", "properties": {"audio": {"title": "Audio", "type": "integer"}, "reasoning": {"title": "Reasoning", "type": "integer"}}, "title": "OutputTokenDetails", "type": "object"}, "SystemMessage": {"additionalProperties": true, "description": "Message for priming AI behavior.\\n\\nThe system message is usually passed in as the first of a sequence\\nof input messages.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n from langchain_core.messages import HumanMessage, SystemMessage\\n\\n messages = [\\n SystemMessage(\\n content=\\"You are a helpful assistant! Your name is Bob.\\"\\n ),\\n HumanMessage(\\n content=\\"What is your name?\\"\\n )\\n ]\\n\\n # Define a chat model and invoke it with the messages\\n print(model.invoke(messages))", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "system", "default": "system", "enum": ["system"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}}, "required": ["content"], "title": "SystemMessage", "type": "object"}, "SystemMessageChunk": {"additionalProperties": true, "description": "System Message chunk.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "SystemMessageChunk", "default": "SystemMessageChunk", "enum": ["SystemMessageChunk"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}}, "required": ["content"], "title": "SystemMessageChunk", "type": "object"}, "ToolCall": {"description": "Represents a request to call a tool.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n {\\n \\"name\\": \\"foo\\",\\n \\"args\\": {\\"a\\": 1},\\n \\"id\\": \\"123\\"\\n }\\n\\n This represents a request to call the tool named \\"foo\\" with arguments {\\"a\\": 1}\\n and an identifier of \\"123\\".", "properties": {"name": {"title": "Name", "type": "string"}, "args": {"title": "Args", "type": "object"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Id"}, "type": {"const": "tool_call", "enum": ["tool_call"], "title": "Type", "type": "string"}}, "required": ["name", "args", "id"], "title": "ToolCall", "type": "object"}, "ToolCallChunk": {"description": "A chunk of a tool call (e.g., as part of a stream).\\n\\nWhen merging ToolCallChunks (e.g., via AIMessageChunk.__add__),\\nall string attributes are concatenated. Chunks are only merged if their\\nvalues of `index` are equal and not None.\\n\\nExample:\\n\\n.. code-block:: python\\n\\n left_chunks = [ToolCallChunk(name=\\"foo\\", args=\'{\\"a\\":\', index=0)]\\n right_chunks = [ToolCallChunk(name=None, args=\'1}\', index=0)]\\n\\n (\\n AIMessageChunk(content=\\"\\", tool_call_chunks=left_chunks)\\n + AIMessageChunk(content=\\"\\", tool_call_chunks=right_chunks)\\n ).tool_call_chunks == [ToolCallChunk(name=\'foo\', args=\'{\\"a\\":1}\', index=0)]", "properties": {"name": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Name"}, "args": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Args"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Id"}, "index": {"anyOf": [{"type": "integer"}, {"type": "null"}], "title": "Index"}, "type": {"const": "tool_call_chunk", "enum": ["tool_call_chunk"], "title": "Type", "type": "string"}}, "required": ["name", "args", "id", "index"], "title": "ToolCallChunk", "type": "object"}, "ToolMessage": {"additionalProperties": true, "description": "Message for passing the result of executing a tool back to a model.\\n\\nToolMessages contain the result of a tool invocation. Typically, the result\\nis encoded inside the `content` field.\\n\\nExample: A ToolMessage representing a result of 42 from a tool call with id\\n\\n .. code-block:: python\\n\\n from langchain_core.messages import ToolMessage\\n\\n ToolMessage(content=\'42\', tool_call_id=\'call_Jja7J89XsjrOLA5r!MEOW!SL\')\\n\\n\\nExample: A ToolMessage where only part of the tool output is sent to the model\\n and the full output is passed in to artifact.\\n\\n .. versionadded:: 0.2.17\\n\\n .. code-block:: python\\n\\n from langchain_core.messages import ToolMessage\\n\\n tool_output = {\\n \\"stdout\\": \\"From the graph we can see that the correlation between x and y is ...\\",\\n \\"stderr\\": None,\\n \\"artifacts\\": {\\"type\\": \\"image\\", \\"base64_data\\": \\"/9j/4gIcSU...\\"},\\n }\\n\\n ToolMessage(\\n content=tool_output[\\"stdout\\"],\\n artifact=tool_output,\\n tool_call_id=\'call_Jja7J89XsjrOLA5r!MEOW!SL\',\\n )\\n\\nThe tool_call_id field is used to associate the tool call request with the\\ntool call response. This is useful in situations where a chat model is able\\nto request multiple tool calls in parallel.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "tool", "default": "tool", "enum": ["tool"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "tool_call_id": {"title": "Tool Call Id", "type": "string"}, "artifact": {"default": null, "title": "Artifact"}, "status": {"default": "success", "enum": ["success", "error"], "title": "Status", "type": "string"}}, "required": ["content", "tool_call_id"], "title": "ToolMessage", "type": "object"}, "ToolMessageChunk": {"additionalProperties": true, "description": "Tool Message chunk.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "ToolMessageChunk", "default": "ToolMessageChunk", "enum": ["ToolMessageChunk"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "tool_call_id": {"title": "Tool Call Id", "type": "string"}, "artifact": {"default": null, "title": "Artifact"}, "status": {"default": "success", "enum": ["success", "error"], "title": "Status", "type": "string"}}, "required": ["content", "tool_call_id"], "title": "ToolMessageChunk", "type": "object"}, "UsageMetadata": {"description": "Usage metadata for a message, such as token counts.\\n\\nThis is a standard representation of token usage that is consistent across models.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n {\\n \\"input_tokens\\": 350,\\n \\"output_tokens\\": 240,\\n \\"total_tokens\\": 590,\\n \\"input_token_details\\": {\\n \\"audio\\": 10,\\n \\"cache_creation\\": 200,\\n \\"cache_read\\": 100,\\n },\\n \\"output_token_details\\": {\\n \\"audio\\": 10,\\n \\"reasoning\\": 200,\\n }\\n }\\n\\n.. versionchanged:: 0.3.9\\n\\n Added ``input_token_details`` and ``output_token_details``.", "properties": {"input_tokens": {"title": "Input Tokens", "type": "integer"}, "output_tokens": {"title": "Output Tokens", "type": "integer"}, "total_tokens": {"title": "Total Tokens", "type": "integer"}, "input_token_details": {"$ref": "#/$defs/InputTokenDetails"}, "output_token_details": {"$ref": "#/$defs/OutputTokenDetails"}}, "required": ["input_tokens", "output_tokens", "total_tokens"], "title": "UsageMetadata", "type": "object"}}, "default": null, "items": {"oneOf": [{"$ref": "#/$defs/AIMessage"}, {"$ref": "#/$defs/HumanMessage"}, {"$ref": "#/$defs/ChatMessage"}, {"$ref": "#/$defs/SystemMessage"}, {"$ref": "#/$defs/FunctionMessage"}, {"$ref": "#/$defs/ToolMessage"}, {"$ref": "#/$defs/AIMessageChunk"}, {"$ref": "#/$defs/HumanMessageChunk"}, {"$ref": "#/$defs/ChatMessageChunk"}, {"$ref": "#/$defs/SystemMessageChunk"}, {"$ref": "#/$defs/FunctionMessageChunk"}, {"$ref": "#/$defs/ToolMessageChunk"}]}, "title": "LangGraphInput", "type": "array"}' # --- -# name: test_message_graph[shallow_postgres].1 +# name: test_message_graph[postgres_shallow].1 '{"$defs": {"AIMessage": {"additionalProperties": true, "description": "Message from an AI.\\n\\nAIMessage is returned from a chat model as a response to a prompt.\\n\\nThis message represents the output of the model and consists of both\\nthe raw output as returned by the model together standardized fields\\n(e.g., tool calls, usage metadata) added by the LangChain framework.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "ai", "default": "ai", "enum": ["ai"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "example": {"default": false, "title": "Example", "type": "boolean"}, "tool_calls": {"default": [], "items": {"$ref": "#/$defs/ToolCall"}, "title": "Tool Calls", "type": "array"}, "invalid_tool_calls": {"default": [], "items": {"$ref": "#/$defs/InvalidToolCall"}, "title": "Invalid Tool Calls", "type": "array"}, "usage_metadata": {"anyOf": [{"$ref": "#/$defs/UsageMetadata"}, {"type": "null"}], "default": null}}, "required": ["content"], "title": "AIMessage", "type": "object"}, "AIMessageChunk": {"additionalProperties": true, "description": "Message chunk from an AI.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "AIMessageChunk", "default": "AIMessageChunk", "enum": ["AIMessageChunk"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "example": {"default": false, "title": "Example", "type": "boolean"}, "tool_calls": {"default": [], "items": {"$ref": "#/$defs/ToolCall"}, "title": "Tool Calls", "type": "array"}, "invalid_tool_calls": {"default": [], "items": {"$ref": "#/$defs/InvalidToolCall"}, "title": "Invalid Tool Calls", "type": "array"}, "usage_metadata": {"anyOf": [{"$ref": "#/$defs/UsageMetadata"}, {"type": "null"}], "default": null}, "tool_call_chunks": {"default": [], "items": {"$ref": "#/$defs/ToolCallChunk"}, "title": "Tool Call Chunks", "type": "array"}}, "required": ["content"], "title": "AIMessageChunk", "type": "object"}, "ChatMessage": {"additionalProperties": true, "description": "Message that can be assigned an arbitrary speaker (i.e. role).", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "chat", "default": "chat", "enum": ["chat"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "role": {"title": "Role", "type": "string"}}, "required": ["content", "role"], "title": "ChatMessage", "type": "object"}, "ChatMessageChunk": {"additionalProperties": true, "description": "Chat Message chunk.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "ChatMessageChunk", "default": "ChatMessageChunk", "enum": ["ChatMessageChunk"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "role": {"title": "Role", "type": "string"}}, "required": ["content", "role"], "title": "ChatMessageChunk", "type": "object"}, "FunctionMessage": {"additionalProperties": true, "description": "Message for passing the result of executing a tool back to a model.\\n\\nFunctionMessage are an older version of the ToolMessage schema, and\\ndo not contain the tool_call_id field.\\n\\nThe tool_call_id field is used to associate the tool call request with the\\ntool call response. This is useful in situations where a chat model is able\\nto request multiple tool calls in parallel.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "function", "default": "function", "enum": ["function"], "title": "Type", "type": "string"}, "name": {"title": "Name", "type": "string"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}}, "required": ["content", "name"], "title": "FunctionMessage", "type": "object"}, "FunctionMessageChunk": {"additionalProperties": true, "description": "Function Message chunk.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "FunctionMessageChunk", "default": "FunctionMessageChunk", "enum": ["FunctionMessageChunk"], "title": "Type", "type": "string"}, "name": {"title": "Name", "type": "string"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}}, "required": ["content", "name"], "title": "FunctionMessageChunk", "type": "object"}, "HumanMessage": {"additionalProperties": true, "description": "Message from a human.\\n\\nHumanMessages are messages that are passed in from a human to the model.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n from langchain_core.messages import HumanMessage, SystemMessage\\n\\n messages = [\\n SystemMessage(\\n content=\\"You are a helpful assistant! Your name is Bob.\\"\\n ),\\n HumanMessage(\\n content=\\"What is your name?\\"\\n )\\n ]\\n\\n # Instantiate a chat model and invoke it with the messages\\n model = ...\\n print(model.invoke(messages))", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "human", "default": "human", "enum": ["human"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "example": {"default": false, "title": "Example", "type": "boolean"}}, "required": ["content"], "title": "HumanMessage", "type": "object"}, "HumanMessageChunk": {"additionalProperties": true, "description": "Human Message chunk.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "HumanMessageChunk", "default": "HumanMessageChunk", "enum": ["HumanMessageChunk"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "example": {"default": false, "title": "Example", "type": "boolean"}}, "required": ["content"], "title": "HumanMessageChunk", "type": "object"}, "InputTokenDetails": {"description": "Breakdown of input token counts.\\n\\nDoes *not* need to sum to full input token count. Does *not* need to have all keys.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n {\\n \\"audio\\": 10,\\n \\"cache_creation\\": 200,\\n \\"cache_read\\": 100,\\n }\\n\\n.. versionadded:: 0.3.9", "properties": {"audio": {"title": "Audio", "type": "integer"}, "cache_creation": {"title": "Cache Creation", "type": "integer"}, "cache_read": {"title": "Cache Read", "type": "integer"}}, "title": "InputTokenDetails", "type": "object"}, "InvalidToolCall": {"description": "Allowance for errors made by LLM.\\n\\nHere we add an `error` key to surface errors made during generation\\n(e.g., invalid JSON arguments.)", "properties": {"name": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Name"}, "args": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Args"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Id"}, "error": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Error"}, "type": {"const": "invalid_tool_call", "enum": ["invalid_tool_call"], "title": "Type", "type": "string"}}, "required": ["name", "args", "id", "error"], "title": "InvalidToolCall", "type": "object"}, "OutputTokenDetails": {"description": "Breakdown of output token counts.\\n\\nDoes *not* need to sum to full output token count. Does *not* need to have all keys.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n {\\n \\"audio\\": 10,\\n \\"reasoning\\": 200,\\n }\\n\\n.. versionadded:: 0.3.9", "properties": {"audio": {"title": "Audio", "type": "integer"}, "reasoning": {"title": "Reasoning", "type": "integer"}}, "title": "OutputTokenDetails", "type": "object"}, "SystemMessage": {"additionalProperties": true, "description": "Message for priming AI behavior.\\n\\nThe system message is usually passed in as the first of a sequence\\nof input messages.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n from langchain_core.messages import HumanMessage, SystemMessage\\n\\n messages = [\\n SystemMessage(\\n content=\\"You are a helpful assistant! Your name is Bob.\\"\\n ),\\n HumanMessage(\\n content=\\"What is your name?\\"\\n )\\n ]\\n\\n # Define a chat model and invoke it with the messages\\n print(model.invoke(messages))", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "system", "default": "system", "enum": ["system"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}}, "required": ["content"], "title": "SystemMessage", "type": "object"}, "SystemMessageChunk": {"additionalProperties": true, "description": "System Message chunk.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "SystemMessageChunk", "default": "SystemMessageChunk", "enum": ["SystemMessageChunk"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}}, "required": ["content"], "title": "SystemMessageChunk", "type": "object"}, "ToolCall": {"description": "Represents a request to call a tool.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n {\\n \\"name\\": \\"foo\\",\\n \\"args\\": {\\"a\\": 1},\\n \\"id\\": \\"123\\"\\n }\\n\\n This represents a request to call the tool named \\"foo\\" with arguments {\\"a\\": 1}\\n and an identifier of \\"123\\".", "properties": {"name": {"title": "Name", "type": "string"}, "args": {"title": "Args", "type": "object"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Id"}, "type": {"const": "tool_call", "enum": ["tool_call"], "title": "Type", "type": "string"}}, "required": ["name", "args", "id"], "title": "ToolCall", "type": "object"}, "ToolCallChunk": {"description": "A chunk of a tool call (e.g., as part of a stream).\\n\\nWhen merging ToolCallChunks (e.g., via AIMessageChunk.__add__),\\nall string attributes are concatenated. Chunks are only merged if their\\nvalues of `index` are equal and not None.\\n\\nExample:\\n\\n.. code-block:: python\\n\\n left_chunks = [ToolCallChunk(name=\\"foo\\", args=\'{\\"a\\":\', index=0)]\\n right_chunks = [ToolCallChunk(name=None, args=\'1}\', index=0)]\\n\\n (\\n AIMessageChunk(content=\\"\\", tool_call_chunks=left_chunks)\\n + AIMessageChunk(content=\\"\\", tool_call_chunks=right_chunks)\\n ).tool_call_chunks == [ToolCallChunk(name=\'foo\', args=\'{\\"a\\":1}\', index=0)]", "properties": {"name": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Name"}, "args": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Args"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "title": "Id"}, "index": {"anyOf": [{"type": "integer"}, {"type": "null"}], "title": "Index"}, "type": {"const": "tool_call_chunk", "enum": ["tool_call_chunk"], "title": "Type", "type": "string"}}, "required": ["name", "args", "id", "index"], "title": "ToolCallChunk", "type": "object"}, "ToolMessage": {"additionalProperties": true, "description": "Message for passing the result of executing a tool back to a model.\\n\\nToolMessages contain the result of a tool invocation. Typically, the result\\nis encoded inside the `content` field.\\n\\nExample: A ToolMessage representing a result of 42 from a tool call with id\\n\\n .. code-block:: python\\n\\n from langchain_core.messages import ToolMessage\\n\\n ToolMessage(content=\'42\', tool_call_id=\'call_Jja7J89XsjrOLA5r!MEOW!SL\')\\n\\n\\nExample: A ToolMessage where only part of the tool output is sent to the model\\n and the full output is passed in to artifact.\\n\\n .. versionadded:: 0.2.17\\n\\n .. code-block:: python\\n\\n from langchain_core.messages import ToolMessage\\n\\n tool_output = {\\n \\"stdout\\": \\"From the graph we can see that the correlation between x and y is ...\\",\\n \\"stderr\\": None,\\n \\"artifacts\\": {\\"type\\": \\"image\\", \\"base64_data\\": \\"/9j/4gIcSU...\\"},\\n }\\n\\n ToolMessage(\\n content=tool_output[\\"stdout\\"],\\n artifact=tool_output,\\n tool_call_id=\'call_Jja7J89XsjrOLA5r!MEOW!SL\',\\n )\\n\\nThe tool_call_id field is used to associate the tool call request with the\\ntool call response. This is useful in situations where a chat model is able\\nto request multiple tool calls in parallel.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "tool", "default": "tool", "enum": ["tool"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "tool_call_id": {"title": "Tool Call Id", "type": "string"}, "artifact": {"default": null, "title": "Artifact"}, "status": {"default": "success", "enum": ["success", "error"], "title": "Status", "type": "string"}}, "required": ["content", "tool_call_id"], "title": "ToolMessage", "type": "object"}, "ToolMessageChunk": {"additionalProperties": true, "description": "Tool Message chunk.", "properties": {"content": {"anyOf": [{"type": "string"}, {"items": {"anyOf": [{"type": "string"}, {"type": "object"}]}, "type": "array"}], "title": "Content"}, "additional_kwargs": {"title": "Additional Kwargs", "type": "object"}, "response_metadata": {"title": "Response Metadata", "type": "object"}, "type": {"const": "ToolMessageChunk", "default": "ToolMessageChunk", "enum": ["ToolMessageChunk"], "title": "Type", "type": "string"}, "name": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Name"}, "id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": null, "title": "Id"}, "tool_call_id": {"title": "Tool Call Id", "type": "string"}, "artifact": {"default": null, "title": "Artifact"}, "status": {"default": "success", "enum": ["success", "error"], "title": "Status", "type": "string"}}, "required": ["content", "tool_call_id"], "title": "ToolMessageChunk", "type": "object"}, "UsageMetadata": {"description": "Usage metadata for a message, such as token counts.\\n\\nThis is a standard representation of token usage that is consistent across models.\\n\\nExample:\\n\\n .. code-block:: python\\n\\n {\\n \\"input_tokens\\": 350,\\n \\"output_tokens\\": 240,\\n \\"total_tokens\\": 590,\\n \\"input_token_details\\": {\\n \\"audio\\": 10,\\n \\"cache_creation\\": 200,\\n \\"cache_read\\": 100,\\n },\\n \\"output_token_details\\": {\\n \\"audio\\": 10,\\n \\"reasoning\\": 200,\\n }\\n }\\n\\n.. versionchanged:: 0.3.9\\n\\n Added ``input_token_details`` and ``output_token_details``.", "properties": {"input_tokens": {"title": "Input Tokens", "type": "integer"}, "output_tokens": {"title": "Output Tokens", "type": "integer"}, "total_tokens": {"title": "Total Tokens", "type": "integer"}, "input_token_details": {"$ref": "#/$defs/InputTokenDetails"}, "output_token_details": {"$ref": "#/$defs/OutputTokenDetails"}}, "required": ["input_tokens", "output_tokens", "total_tokens"], "title": "UsageMetadata", "type": "object"}}, "default": null, "items": {"oneOf": [{"$ref": "#/$defs/AIMessage"}, {"$ref": "#/$defs/HumanMessage"}, {"$ref": "#/$defs/ChatMessage"}, {"$ref": "#/$defs/SystemMessage"}, {"$ref": "#/$defs/FunctionMessage"}, {"$ref": "#/$defs/ToolMessage"}, {"$ref": "#/$defs/AIMessageChunk"}, {"$ref": "#/$defs/HumanMessageChunk"}, {"$ref": "#/$defs/ChatMessageChunk"}, {"$ref": "#/$defs/SystemMessageChunk"}, {"$ref": "#/$defs/FunctionMessageChunk"}, {"$ref": "#/$defs/ToolMessageChunk"}]}, "title": "LangGraphOutput", "type": "array"}' # --- -# name: test_message_graph[shallow_postgres].2 +# name: test_message_graph[postgres_shallow].2 ''' { "nodes": [ @@ -3212,7 +3212,7 @@ } ''' # --- -# name: test_message_graph[shallow_postgres].3 +# name: test_message_graph[postgres_shallow].3 ''' graph TD; __start__ --> agent; @@ -3458,7 +3458,7 @@ ''' # --- -# name: test_send_react_interrupt_control[shallow_postgres] +# name: test_send_react_interrupt_control[postgres_shallow] ''' %%{init: {'flowchart': {'curve': 'linear'}}}%% graph TD; @@ -3578,7 +3578,7 @@ ''' # --- -# name: test_start_branch_then[shallow_postgres] +# name: test_start_branch_then[postgres_shallow] ''' %%{init: {'flowchart': {'curve': 'linear'}}}%% graph TD; @@ -3739,7 +3739,7 @@ ''' # --- -# name: test_weather_subgraph[shallow_postgres] +# name: test_weather_subgraph[postgres_shallow] ''' %%{init: {'flowchart': {'curve': 'linear'}}}%% graph TD; diff --git a/libs/langgraph/tests/__snapshots__/test_large_cases_async.ambr b/libs/langgraph/tests/__snapshots__/test_large_cases_async.ambr new file mode 100644 index 000000000..898dd4f89 --- /dev/null +++ b/libs/langgraph/tests/__snapshots__/test_large_cases_async.ambr @@ -0,0 +1,151 @@ +# serializer version: 1 +# name: test_weather_subgraph[memory] + ''' + %%{init: {'flowchart': {'curve': 'linear'}}}%% + graph TD; + __start__([

__start__

]):::first + router_node(router_node) + normal_llm_node(normal_llm_node) + weather_graph_model_node(model_node) + weather_graph_weather_node(weather_node
__interrupt = before) + __end__([

__end__

]):::last + __start__ --> router_node; + normal_llm_node --> __end__; + weather_graph_weather_node --> __end__; + router_node -.-> normal_llm_node; + router_node -.-> weather_graph_model_node; + router_node -.-> __end__; + subgraph weather_graph + weather_graph_model_node --> weather_graph_weather_node; + end + classDef default fill:#f2f0ff,line-height:1.2 + classDef first fill-opacity:0 + classDef last fill:#bfb6fc + + ''' +# --- +# name: test_weather_subgraph[postgres_aio] + ''' + %%{init: {'flowchart': {'curve': 'linear'}}}%% + graph TD; + __start__([

__start__

]):::first + router_node(router_node) + normal_llm_node(normal_llm_node) + weather_graph_model_node(model_node) + weather_graph_weather_node(weather_node
__interrupt = before) + __end__([

__end__

]):::last + __start__ --> router_node; + normal_llm_node --> __end__; + weather_graph_weather_node --> __end__; + router_node -.-> normal_llm_node; + router_node -.-> weather_graph_model_node; + router_node -.-> __end__; + subgraph weather_graph + weather_graph_model_node --> weather_graph_weather_node; + end + classDef default fill:#f2f0ff,line-height:1.2 + classDef first fill-opacity:0 + classDef last fill:#bfb6fc + + ''' +# --- +# name: test_weather_subgraph[postgres_aio_pipe] + ''' + %%{init: {'flowchart': {'curve': 'linear'}}}%% + graph TD; + __start__([

__start__

]):::first + router_node(router_node) + normal_llm_node(normal_llm_node) + weather_graph_model_node(model_node) + weather_graph_weather_node(weather_node
__interrupt = before) + __end__([

__end__

]):::last + __start__ --> router_node; + normal_llm_node --> __end__; + weather_graph_weather_node --> __end__; + router_node -.-> normal_llm_node; + router_node -.-> weather_graph_model_node; + router_node -.-> __end__; + subgraph weather_graph + weather_graph_model_node --> weather_graph_weather_node; + end + classDef default fill:#f2f0ff,line-height:1.2 + classDef first fill-opacity:0 + classDef last fill:#bfb6fc + + ''' +# --- +# name: test_weather_subgraph[postgres_aio_pool] + ''' + %%{init: {'flowchart': {'curve': 'linear'}}}%% + graph TD; + __start__([

__start__

]):::first + router_node(router_node) + normal_llm_node(normal_llm_node) + weather_graph_model_node(model_node) + weather_graph_weather_node(weather_node
__interrupt = before) + __end__([

__end__

]):::last + __start__ --> router_node; + normal_llm_node --> __end__; + weather_graph_weather_node --> __end__; + router_node -.-> normal_llm_node; + router_node -.-> weather_graph_model_node; + router_node -.-> __end__; + subgraph weather_graph + weather_graph_model_node --> weather_graph_weather_node; + end + classDef default fill:#f2f0ff,line-height:1.2 + classDef first fill-opacity:0 + classDef last fill:#bfb6fc + + ''' +# --- +# name: test_weather_subgraph[postgres_aio_shallow] + ''' + %%{init: {'flowchart': {'curve': 'linear'}}}%% + graph TD; + __start__([

__start__

]):::first + router_node(router_node) + normal_llm_node(normal_llm_node) + weather_graph_model_node(model_node) + weather_graph_weather_node(weather_node
__interrupt = before) + __end__([

__end__

]):::last + __start__ --> router_node; + normal_llm_node --> __end__; + weather_graph_weather_node --> __end__; + router_node -.-> normal_llm_node; + router_node -.-> weather_graph_model_node; + router_node -.-> __end__; + subgraph weather_graph + weather_graph_model_node --> weather_graph_weather_node; + end + classDef default fill:#f2f0ff,line-height:1.2 + classDef first fill-opacity:0 + classDef last fill:#bfb6fc + + ''' +# --- +# name: test_weather_subgraph[sqlite_aio] + ''' + %%{init: {'flowchart': {'curve': 'linear'}}}%% + graph TD; + __start__([

__start__

]):::first + router_node(router_node) + normal_llm_node(normal_llm_node) + weather_graph_model_node(model_node) + weather_graph_weather_node(weather_node
__interrupt = before) + __end__([

__end__

]):::last + __start__ --> router_node; + normal_llm_node --> __end__; + weather_graph_weather_node --> __end__; + router_node -.-> normal_llm_node; + router_node -.-> weather_graph_model_node; + router_node -.-> __end__; + subgraph weather_graph + weather_graph_model_node --> weather_graph_weather_node; + end + classDef default fill:#f2f0ff,line-height:1.2 + classDef first fill-opacity:0 + classDef last fill:#bfb6fc + + ''' +# --- diff --git a/libs/langgraph/tests/__snapshots__/test_pregel.ambr b/libs/langgraph/tests/__snapshots__/test_pregel.ambr index 10e42fdb3..97294512f 100644 --- a/libs/langgraph/tests/__snapshots__/test_pregel.ambr +++ b/libs/langgraph/tests/__snapshots__/test_pregel.ambr @@ -2891,7 +2891,7 @@ ''' # --- -# name: test_in_one_fan_out_state_graph_waiting_edge[shallow_postgres] +# name: test_in_one_fan_out_state_graph_waiting_edge[postgres_shallow] ''' graph TD; __start__ --> rewrite_query; @@ -3407,7 +3407,7 @@ 'type': 'object', }) # --- -# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic1[shallow_postgres] +# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic1[postgres_shallow] ''' graph TD; __start__ --> rewrite_query; @@ -3420,7 +3420,7 @@ ''' # --- -# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic1[shallow_postgres].1 +# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic1[postgres_shallow].1 dict({ 'definitions': dict({ 'InnerObject': dict({ @@ -3454,7 +3454,7 @@ 'type': 'object', }) # --- -# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic1[shallow_postgres].2 +# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic1[postgres_shallow].2 dict({ 'properties': dict({ 'answer': dict({ @@ -4024,7 +4024,7 @@ 'type': 'object', }) # --- -# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic2[shallow_postgres] +# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic2[postgres_shallow] ''' graph TD; __start__ --> rewrite_query; @@ -4037,7 +4037,7 @@ ''' # --- -# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic2[shallow_postgres].1 +# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic2[postgres_shallow].1 dict({ '$defs': dict({ 'InnerObject': dict({ @@ -4071,7 +4071,7 @@ 'type': 'object', }) # --- -# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic2[shallow_postgres].2 +# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic2[postgres_shallow].2 dict({ 'properties': dict({ 'answer': dict({ @@ -4242,7 +4242,7 @@ ''' # --- -# name: test_in_one_fan_out_state_graph_waiting_edge_via_branch[shallow_postgres] +# name: test_in_one_fan_out_state_graph_waiting_edge_via_branch[postgres_shallow] ''' graph TD; __start__ --> rewrite_query; diff --git a/libs/langgraph/tests/__snapshots__/test_pregel_async.ambr b/libs/langgraph/tests/__snapshots__/test_pregel_async.ambr index 46916c7a4..22c2562ce 100644 --- a/libs/langgraph/tests/__snapshots__/test_pregel_async.ambr +++ b/libs/langgraph/tests/__snapshots__/test_pregel_async.ambr @@ -934,6 +934,127 @@ 'type': 'object', }) # --- +# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic2[postgres_aio_shallow] + ''' + graph TD; + __start__ --> rewrite_query; + analyzer_one --> retriever_one; + qa --> __end__; + retriever_one --> qa; + retriever_two --> qa; + rewrite_query --> analyzer_one; + rewrite_query -.-> retriever_two; + + ''' +# --- +# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic2[postgres_aio_shallow].1 + dict({ + '$defs': dict({ + 'InnerObject': dict({ + 'properties': dict({ + 'yo': dict({ + 'title': 'Yo', + 'type': 'integer', + }), + }), + 'required': list([ + 'yo', + ]), + 'title': 'InnerObject', + 'type': 'object', + }), + }), + 'properties': dict({ + 'answer': dict({ + 'anyOf': list([ + dict({ + 'type': 'string', + }), + dict({ + 'type': 'null', + }), + ]), + 'default': None, + 'title': 'Answer', + }), + 'docs': dict({ + 'items': dict({ + 'type': 'string', + }), + 'title': 'Docs', + 'type': 'array', + }), + 'inner': dict({ + '$ref': '#/$defs/InnerObject', + }), + 'query': dict({ + 'title': 'Query', + 'type': 'string', + }), + }), + 'required': list([ + 'query', + 'inner', + 'docs', + ]), + 'title': 'State', + 'type': 'object', + }) +# --- +# name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic2[postgres_aio_shallow].2 + dict({ + '$defs': dict({ + 'InnerObject': dict({ + 'properties': dict({ + 'yo': dict({ + 'title': 'Yo', + 'type': 'integer', + }), + }), + 'required': list([ + 'yo', + ]), + 'title': 'InnerObject', + 'type': 'object', + }), + }), + 'properties': dict({ + 'answer': dict({ + 'anyOf': list([ + dict({ + 'type': 'string', + }), + dict({ + 'type': 'null', + }), + ]), + 'default': None, + 'title': 'Answer', + }), + 'docs': dict({ + 'items': dict({ + 'type': 'string', + }), + 'title': 'Docs', + 'type': 'array', + }), + 'inner': dict({ + '$ref': '#/$defs/InnerObject', + }), + 'query': dict({ + 'title': 'Query', + 'type': 'string', + }), + }), + 'required': list([ + 'query', + 'inner', + 'docs', + ]), + 'title': 'State', + 'type': 'object', + }) +# --- # name: test_in_one_fan_out_state_graph_waiting_edge_custom_state_class_pydantic2[sqlite_aio] ''' graph TD; @@ -1362,6 +1483,21 @@ ''' # --- +# name: test_send_react_interrupt_control[postgres_aio_shallow] + ''' + %%{init: {'flowchart': {'curve': 'linear'}}}%% + graph TD; + __start__([

__start__

]):::first + agent(agent) + foo([foo]):::last + __start__ --> agent; + agent -.-> foo; + classDef default fill:#f2f0ff,line-height:1.2 + classDef first fill-opacity:0 + classDef last fill:#bfb6fc + + ''' +# --- # name: test_send_react_interrupt_control[sqlite_aio] ''' %%{init: {'flowchart': {'curve': 'linear'}}}%% diff --git a/libs/langgraph/tests/conftest.py b/libs/langgraph/tests/conftest.py index aea36597c..a7909eb15 100644 --- a/libs/langgraph/tests/conftest.py +++ b/libs/langgraph/tests/conftest.py @@ -14,7 +14,10 @@ from langgraph.checkpoint.duckdb import DuckDBSaver from langgraph.checkpoint.duckdb.aio import AsyncDuckDBSaver from langgraph.checkpoint.postgres import PostgresSaver, ShallowPostgresSaver -from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver +from langgraph.checkpoint.postgres.aio import ( + AsyncPostgresSaver, + AsyncShallowPostgresSaver, +) from langgraph.checkpoint.sqlite import SqliteSaver from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver from langgraph.store.base import BaseStore @@ -101,7 +104,7 @@ def checkpointer_postgres(): @pytest.fixture(scope="function") -def checkpointer_shallow_postgres(): +def checkpointer_postgres_shallow(): database = f"test_{uuid4().hex[:16]}" # create unique db with Connection.connect(DEFAULT_POSTGRES_URI, autocommit=True) as conn: @@ -186,6 +189,31 @@ async def _checkpointer_postgres_aio(): await conn.execute(f"DROP DATABASE {database}") +@asynccontextmanager +async def _checkpointer_postgres_aio_shallow(): + if sys.version_info < (3, 10): + pytest.skip("Async Postgres tests require Python 3.10+") + database = f"test_{uuid4().hex[:16]}" + # create unique db + async with await AsyncConnection.connect( + DEFAULT_POSTGRES_URI, autocommit=True + ) as conn: + await conn.execute(f"CREATE DATABASE {database}") + try: + # yield checkpointer + async with AsyncShallowPostgresSaver.from_conn_string( + DEFAULT_POSTGRES_URI + database + ) as checkpointer: + await checkpointer.setup() + yield checkpointer + finally: + # drop unique db + async with await AsyncConnection.connect( + DEFAULT_POSTGRES_URI, autocommit=True + ) as conn: + await conn.execute(f"DROP DATABASE {database}") + + @asynccontextmanager async def _checkpointer_postgres_aio_pipe(): if sys.version_info < (3, 10): @@ -259,6 +287,9 @@ async def awith_checkpointer( elif checkpointer_name == "postgres_aio": async with _checkpointer_postgres_aio() as checkpointer: yield checkpointer + elif checkpointer_name == "postgres_aio_shallow": + async with _checkpointer_postgres_aio_shallow() as checkpointer: + yield checkpointer elif checkpointer_name == "postgres_aio_pipe": async with _checkpointer_postgres_aio_pipe() as checkpointer: yield checkpointer @@ -436,7 +467,7 @@ async def awith_store(store_name: Optional[str]) -> AsyncIterator[BaseStore]: raise NotImplementedError(f"Unknown store {store_name}") -SHALLOW_CHECKPOINTERS_SYNC = ["shallow_postgres"] +SHALLOW_CHECKPOINTERS_SYNC = ["postgres_shallow"] REGULAR_CHECKPOINTERS_SYNC = [ "memory", "sqlite", @@ -448,13 +479,18 @@ async def awith_store(store_name: Optional[str]) -> AsyncIterator[BaseStore]: *REGULAR_CHECKPOINTERS_SYNC, *SHALLOW_CHECKPOINTERS_SYNC, ] -ALL_CHECKPOINTERS_ASYNC = [ +SHALLOW_CHECKPOINTERS_ASYNC = ["postgres_aio_shallow"] +REGULAR_CHECKPOINTERS_ASYNC = [ "memory", "sqlite_aio", "postgres_aio", "postgres_aio_pipe", "postgres_aio_pool", ] +ALL_CHECKPOINTERS_ASYNC = [ + *REGULAR_CHECKPOINTERS_ASYNC, + *SHALLOW_CHECKPOINTERS_ASYNC, +] ALL_CHECKPOINTERS_ASYNC_PLUS_NONE = [ *ALL_CHECKPOINTERS_ASYNC, None, diff --git a/libs/langgraph/tests/test_large_cases.py b/libs/langgraph/tests/test_large_cases.py index 426d7692b..6bab8a647 100644 --- a/libs/langgraph/tests/test_large_cases.py +++ b/libs/langgraph/tests/test_large_cases.py @@ -6098,7 +6098,11 @@ class State(TypedDict): parent_config=( None if "shallow" in checkpointer_name - else list(tool_two.checkpointer.list(thread1, limit=2))[-1].config + else list( + tool_two.checkpointer.list( + {"configurable": {"thread_id": "1", "checkpoint_ns": ""}}, limit=2 + ) + )[-1].config ), ) # clear the interrupt and next tasks @@ -6126,7 +6130,11 @@ class State(TypedDict): parent_config=( None if "shallow" in checkpointer_name - else list(tool_two.checkpointer.list(thread1, limit=2))[-1].config + else list( + tool_two.checkpointer.list( + {"configurable": {"thread_id": "1", "checkpoint_ns": ""}}, limit=2 + ) + )[-1].config ), ) diff --git a/libs/langgraph/tests/test_large_cases_async.py b/libs/langgraph/tests/test_large_cases_async.py index 7be4f6943..d7a647a53 100644 --- a/libs/langgraph/tests/test_large_cases_async.py +++ b/libs/langgraph/tests/test_large_cases_async.py @@ -6,7 +6,6 @@ from typing import ( Annotated, Any, - AnyStr, AsyncIterator, Literal, Optional, @@ -23,21 +22,25 @@ from pytest_mock import MockerFixture from syrupy import SnapshotAssertion +from langgraph.channels.context import Context from langgraph.channels.last_value import LastValue from langgraph.channels.untracked_value import UntrackedValue from langgraph.constants import END, FF_SEND_V2, PULL, PUSH, START from langgraph.graph.graph import Graph from langgraph.graph.message import MessageGraph, add_messages from langgraph.graph.state import StateGraph -from langgraph.managed.context import Context from langgraph.managed.shared_value import SharedValue from langgraph.prebuilt.chat_agent_executor import create_react_agent from langgraph.prebuilt.tool_node import ToolNode from langgraph.pregel import Channel, Pregel from langgraph.store.memory import InMemoryStore from langgraph.types import PregelTask, Send, StateSnapshot, StreamWriter -from tests.any_str import AnyDict -from tests.conftest import ALL_CHECKPOINTERS_ASYNC, awith_checkpointer +from tests.any_str import AnyDict, AnyStr +from tests.conftest import ( + ALL_CHECKPOINTERS_ASYNC, + REGULAR_CHECKPOINTERS_ASYNC, + awith_checkpointer, +) from tests.fake_chat import FakeChatModel from tests.fake_tracer import FakeTracer from tests.messages import ( @@ -47,6 +50,8 @@ _AnyIdToolMessage, ) +pytestmark = pytest.mark.anyio + @pytest.mark.parametrize("checkpointer_name", ALL_CHECKPOINTERS_ASYNC) async def test_invoke_two_processes_in_out_interrupt( @@ -110,6 +115,9 @@ async def test_invoke_two_processes_in_out_interrupt( snapshot = await app.aget_state(thread2) assert snapshot.next == () + if "shallow" in checkpointer_name: + return + # list history history = [c async for c in app.aget_state_history(thread1)] assert history == [ @@ -310,7 +318,7 @@ async def test_invoke_two_processes_in_out_interrupt( ] -@pytest.mark.parametrize("checkpointer_name", ALL_CHECKPOINTERS_ASYNC) +@pytest.mark.parametrize("checkpointer_name", REGULAR_CHECKPOINTERS_ASYNC) async def test_fork_always_re_runs_nodes( checkpointer_name: str, mocker: MockerFixture ) -> None: @@ -843,9 +851,13 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, "thread_id": "1", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) await app_w_interrupt.aupdate_state( @@ -873,10 +885,14 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, tasks=(PregelTask(AnyStr(), "tools", (PULL, "tools")),), next=("tools",), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "update", @@ -893,9 +909,13 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, "thread_id": "1", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) assert [c async for c in app_w_interrupt.astream(None, config)] == [ @@ -989,10 +1009,14 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, tasks=(), next=(), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "update", @@ -1018,9 +1042,13 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, "thread_id": "1", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) # test state get/update methods with interrupt_before @@ -1063,10 +1091,14 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, tasks=(PregelTask(AnyStr(), "tools", (PULL, "tools")),), next=("tools",), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "2", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -1085,9 +1117,13 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, "thread_id": "2", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) await app_w_interrupt.aupdate_state( @@ -1115,10 +1151,14 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, tasks=(PregelTask(AnyStr(), "tools", (PULL, "tools")),), next=("tools",), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "2", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "update", @@ -1135,9 +1175,13 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, "thread_id": "2", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) assert [c async for c in app_w_interrupt.astream(None, config)] == [ @@ -1231,10 +1275,14 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, tasks=(), next=(), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "2", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "update", @@ -1260,9 +1308,13 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, "thread_id": "2", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) # test re-invoke to continue with interrupt_before @@ -1305,10 +1357,14 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, tasks=(PregelTask(AnyStr(), "tools", (PULL, "tools")),), next=("tools",), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "3", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -1327,9 +1383,13 @@ async def should_continue(data: dict, config: RunnableConfig) -> str: }, "thread_id": "3", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) assert [c async for c in app_w_interrupt.astream(None, config)] == [ @@ -1729,10 +1789,14 @@ def should_continue(data: AgentState) -> str: }, tasks=(PregelTask(AnyStr(), "tools", (PULL, "tools")),), next=("tools",), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -1748,9 +1812,13 @@ def should_continue(data: AgentState) -> str: }, "thread_id": "1", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) async with assert_ctx_once(): @@ -1776,10 +1844,14 @@ def should_continue(data: AgentState) -> str: }, tasks=(PregelTask(AnyStr(), "tools", (PULL, "tools")),), next=("tools",), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "update", @@ -1795,9 +1867,13 @@ def should_continue(data: AgentState) -> str: }, "thread_id": "1", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) async with assert_ctx_once(): @@ -1858,10 +1934,14 @@ def should_continue(data: AgentState) -> str: }, tasks=(), next=(), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "update", @@ -1876,9 +1956,13 @@ def should_continue(data: AgentState) -> str: }, "thread_id": "1", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) # test state get/update methods with interrupt_before @@ -1917,10 +2001,14 @@ def should_continue(data: AgentState) -> str: }, tasks=(PregelTask(AnyStr(), "tools", (PULL, "tools")),), next=("tools",), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "2", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -1936,9 +2024,13 @@ def should_continue(data: AgentState) -> str: }, "thread_id": "2", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) await app_w_interrupt.aupdate_state( @@ -1963,10 +2055,14 @@ def should_continue(data: AgentState) -> str: }, tasks=(PregelTask(AnyStr(), "tools", (PULL, "tools")),), next=("tools",), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "2", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "update", @@ -1982,9 +2078,13 @@ def should_continue(data: AgentState) -> str: }, "thread_id": "2", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) assert [c async for c in app_w_interrupt.astream(None, config)] == [ @@ -2043,10 +2143,14 @@ def should_continue(data: AgentState) -> str: }, tasks=(), next=(), - config=(await app_w_interrupt.checkpointer.aget_tuple(config)).config, - created_at=( - await app_w_interrupt.checkpointer.aget_tuple(config) - ).checkpoint["ts"], + config={ + "configurable": { + "thread_id": "2", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "update", @@ -2061,9 +2165,13 @@ def should_continue(data: AgentState) -> str: }, "thread_id": "2", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) @@ -3508,9 +3616,13 @@ def should_continue(messages): }, "thread_id": "1", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) # modify ai message @@ -3558,9 +3670,13 @@ def should_continue(messages): }, "thread_id": "1", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) assert [c async for c in app_w_interrupt.astream(None, config)] == [ @@ -3644,9 +3760,13 @@ def should_continue(messages): }, "thread_id": "1", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) await app_w_interrupt.aupdate_state( @@ -3688,9 +3808,13 @@ def should_continue(messages): "writes": {"agent": AIMessage(content="answer", id="ai2")}, "thread_id": "1", }, - parent_config=[ - c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in app_w_interrupt.checkpointer.alist(config, limit=2) + ][-1].config + ), ) @@ -3983,32 +4107,38 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: "my_key": "value", "market": "DE", } - assert [c.metadata async for c in tool_two.checkpointer.alist(thread1)] == [ - { - "parents": {}, - "source": "loop", - "step": 0, - "writes": None, - "assistant_id": "a", - "thread_id": "1", - }, - { - "parents": {}, - "source": "input", - "step": -1, - "writes": {"__start__": {"my_key": "value", "market": "DE"}}, - "assistant_id": "a", - "thread_id": "1", - }, - ] + if "shallow" not in checkpointer_name: + assert [c.metadata async for c in tool_two.checkpointer.alist(thread1)] == [ + { + "parents": {}, + "source": "loop", + "step": 0, + "writes": None, + "assistant_id": "a", + "thread_id": "1", + }, + { + "parents": {}, + "source": "input", + "step": -1, + "writes": {"__start__": {"my_key": "value", "market": "DE"}}, + "assistant_id": "a", + "thread_id": "1", + }, + ] + assert await tool_two.aget_state(thread1) == StateSnapshot( values={"my_key": "value", "market": "DE"}, tasks=(PregelTask(AnyStr(), "tool_two_slow", (PULL, "tool_two_slow")),), next=("tool_two_slow",), - config=(await tool_two.checkpointer.aget_tuple(thread1)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread1)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "1", + "checkpoint_ns": AnyStr(), + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4017,9 +4147,13 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: "assistant_id": "a", "thread_id": "1", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread1, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread1, limit=2)][ + -1 + ].config + ), ) # resume, for same result as above assert await tool_two.ainvoke(None, thread1, debug=1) == { @@ -4030,21 +4164,29 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: values={"my_key": "value slow", "market": "DE"}, tasks=(), next=(), - config=(await tool_two.checkpointer.aget_tuple(thread1)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread1)).checkpoint[ - "ts" - ], - metadata={ - "parents": {}, + config={ + "configurable": { + "thread_id": "1", + "checkpoint_ns": AnyStr(), + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), + metadata={ + "parents": {}, "source": "loop", "step": 1, "writes": {"tool_two_slow": {"my_key": " slow"}}, "assistant_id": "a", "thread_id": "1", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread1, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread1, limit=2)][ + -1 + ].config + ), ) thread2 = {"configurable": {"thread_id": "2", "assistant_id": "a"}} @@ -4057,10 +4199,14 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: values={"my_key": "value", "market": "US"}, tasks=(PregelTask(AnyStr(), "tool_two_fast", (PULL, "tool_two_fast")),), next=("tool_two_fast",), - config=(await tool_two.checkpointer.aget_tuple(thread2)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread2)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "2", + "checkpoint_ns": AnyStr(), + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4069,9 +4215,13 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: "assistant_id": "a", "thread_id": "2", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread2, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread2, limit=2)][ + -1 + ].config + ), ) # resume, for same result as above assert await tool_two.ainvoke(None, thread2, debug=1) == { @@ -4082,10 +4232,14 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: values={"my_key": "value fast", "market": "US"}, tasks=(), next=(), - config=(await tool_two.checkpointer.aget_tuple(thread2)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread2)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "2", + "checkpoint_ns": AnyStr(), + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4094,9 +4248,13 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: "assistant_id": "a", "thread_id": "2", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread2, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread2, limit=2)][ + -1 + ].config + ), ) thread3 = {"configurable": {"thread_id": "3", "assistant_id": "b"}} @@ -4109,10 +4267,14 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: values={"my_key": "value", "market": "US"}, tasks=(PregelTask(AnyStr(), "tool_two_fast", (PULL, "tool_two_fast")),), next=("tool_two_fast",), - config=(await tool_two.checkpointer.aget_tuple(thread3)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread3)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "3", + "checkpoint_ns": AnyStr(), + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4121,9 +4283,13 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: "assistant_id": "b", "thread_id": "3", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread3, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread3, limit=2)][ + -1 + ].config + ), ) # update state await tool_two.aupdate_state(thread3, {"my_key": "key"}) # appends to my_key @@ -4131,10 +4297,14 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: values={"my_key": "valuekey", "market": "US"}, tasks=(PregelTask(AnyStr(), "tool_two_fast", (PULL, "tool_two_fast")),), next=("tool_two_fast",), - config=(await tool_two.checkpointer.aget_tuple(thread3)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread3)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "3", + "checkpoint_ns": AnyStr(), + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "update", @@ -4143,9 +4313,13 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: "assistant_id": "b", "thread_id": "3", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread3, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread3, limit=2)][ + -1 + ].config + ), ) # resume, for same result as above assert await tool_two.ainvoke(None, thread3, debug=1) == { @@ -4156,10 +4330,14 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: values={"my_key": "valuekey fast", "market": "US"}, tasks=(), next=(), - config=(await tool_two.checkpointer.aget_tuple(thread3)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread3)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "3", + "checkpoint_ns": AnyStr(), + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4168,9 +4346,13 @@ def tool_two_fast(data: State, config: RunnableConfig) -> State: "assistant_id": "b", "thread_id": "3", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread3, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread3, limit=2)][ + -1 + ].config + ), ) @@ -4688,10 +4870,14 @@ class State(TypedDict): values={"my_key": "value prepared", "market": "DE"}, tasks=(PregelTask(AnyStr(), "tool_two_slow", (PULL, "tool_two_slow")),), next=("tool_two_slow",), - config=(await tool_two.checkpointer.aget_tuple(thread1)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread1)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "11", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4699,9 +4885,13 @@ class State(TypedDict): "writes": {"prepare": {"my_key": " prepared"}}, "thread_id": "11", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread1, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread1, limit=2)][ + -1 + ].config + ), ) # resume, for same result as above assert await tool_two.ainvoke(None, thread1, debug=1) == { @@ -4712,10 +4902,14 @@ class State(TypedDict): values={"my_key": "value prepared slow finished", "market": "DE"}, tasks=(), next=(), - config=(await tool_two.checkpointer.aget_tuple(thread1)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread1)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "11", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4723,9 +4917,13 @@ class State(TypedDict): "writes": {"finish": {"my_key": " finished"}}, "thread_id": "11", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread1, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread1, limit=2)][ + -1 + ].config + ), ) thread2 = {"configurable": {"thread_id": "12"}} @@ -4738,10 +4936,14 @@ class State(TypedDict): values={"my_key": "value prepared", "market": "US"}, tasks=(PregelTask(AnyStr(), "tool_two_fast", (PULL, "tool_two_fast")),), next=("tool_two_fast",), - config=(await tool_two.checkpointer.aget_tuple(thread2)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread2)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "12", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4749,9 +4951,13 @@ class State(TypedDict): "writes": {"prepare": {"my_key": " prepared"}}, "thread_id": "12", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread2, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread2, limit=2)][ + -1 + ].config + ), ) # resume, for same result as above assert await tool_two.ainvoke(None, thread2, debug=1) == { @@ -4762,10 +4968,14 @@ class State(TypedDict): values={"my_key": "value prepared fast finished", "market": "US"}, tasks=(), next=(), - config=(await tool_two.checkpointer.aget_tuple(thread2)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread2)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "12", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4773,9 +4983,13 @@ class State(TypedDict): "writes": {"finish": {"my_key": " finished"}}, "thread_id": "12", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread2, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread2, limit=2)][ + -1 + ].config + ), ) tool_two = tool_two_graph.compile( @@ -4796,10 +5010,14 @@ class State(TypedDict): values={"my_key": "value prepared", "market": "DE"}, tasks=(PregelTask(AnyStr(), "tool_two_slow", (PULL, "tool_two_slow")),), next=("tool_two_slow",), - config=(await tool_two.checkpointer.aget_tuple(thread1)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread1)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "21", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4807,9 +5025,13 @@ class State(TypedDict): "writes": {"prepare": {"my_key": " prepared"}}, "thread_id": "21", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread1, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread1, limit=2)][ + -1 + ].config + ), ) # resume, for same result as above assert await tool_two.ainvoke(None, thread1, debug=1) == { @@ -4820,10 +5042,14 @@ class State(TypedDict): values={"my_key": "value prepared slow finished", "market": "DE"}, tasks=(), next=(), - config=(await tool_two.checkpointer.aget_tuple(thread1)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread1)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "21", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4831,9 +5057,13 @@ class State(TypedDict): "writes": {"finish": {"my_key": " finished"}}, "thread_id": "21", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread1, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread1, limit=2)][ + -1 + ].config + ), ) thread2 = {"configurable": {"thread_id": "22"}} @@ -4846,10 +5076,14 @@ class State(TypedDict): values={"my_key": "value prepared", "market": "US"}, tasks=(PregelTask(AnyStr(), "tool_two_fast", (PULL, "tool_two_fast")),), next=("tool_two_fast",), - config=(await tool_two.checkpointer.aget_tuple(thread2)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread2)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "22", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4857,9 +5091,13 @@ class State(TypedDict): "writes": {"prepare": {"my_key": " prepared"}}, "thread_id": "22", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread2, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread2, limit=2)][ + -1 + ].config + ), ) # resume, for same result as above assert await tool_two.ainvoke(None, thread2, debug=1) == { @@ -4870,10 +5108,14 @@ class State(TypedDict): values={"my_key": "value prepared fast finished", "market": "US"}, tasks=(), next=(), - config=(await tool_two.checkpointer.aget_tuple(thread2)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread2)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "22", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4881,9 +5123,13 @@ class State(TypedDict): "writes": {"finish": {"my_key": " finished"}}, "thread_id": "22", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread2, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread2, limit=2)][ + -1 + ].config + ), ) thread3 = {"configurable": {"thread_id": "23"}} @@ -4917,10 +5163,14 @@ class State(TypedDict): values={"my_key": "key prepared", "market": "DE"}, tasks=(PregelTask(AnyStr(), "tool_two_slow", (PULL, "tool_two_slow")),), next=("tool_two_slow",), - config=(await tool_two.checkpointer.aget_tuple(thread3)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread3)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "23", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4928,7 +5178,7 @@ class State(TypedDict): "writes": {"prepare": {"my_key": " prepared"}}, "thread_id": "23", }, - parent_config=uconfig, + parent_config=(None if "shallow" in checkpointer_name else uconfig), ) # resume, for same result as above assert await tool_two.ainvoke(None, thread3, debug=1) == { @@ -4939,10 +5189,14 @@ class State(TypedDict): values={"my_key": "key prepared slow finished", "market": "DE"}, tasks=(), next=(), - config=(await tool_two.checkpointer.aget_tuple(thread3)).config, - created_at=(await tool_two.checkpointer.aget_tuple(thread3)).checkpoint[ - "ts" - ], + config={ + "configurable": { + "thread_id": "23", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + }, + created_at=AnyStr(), metadata={ "parents": {}, "source": "loop", @@ -4950,9 +5204,13 @@ class State(TypedDict): "writes": {"finish": {"my_key": " finished"}}, "thread_id": "23", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread3, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread3, limit=2)][ + -1 + ].config + ), ) @@ -5038,13 +5296,17 @@ def outer_2(state: State): "thread_id": "1", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), ) # now, get_state with subgraphs state assert await app.aget_state(config, subgraphs=True) == StateSnapshot( @@ -5098,16 +5360,20 @@ def outer_2(state: State): "langgraph_checkpoint_ns": AnyStr("inner:"), }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": AnyStr("inner:"), - "checkpoint_id": AnyStr(), - "checkpoint_map": AnyDict( - {"": AnyStr(), AnyStr("child:"): AnyStr()} - ), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": AnyStr("inner:"), + "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), + } } - }, + ), ), ), ), @@ -5127,17 +5393,21 @@ def outer_2(state: State): "thread_id": "1", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), ) # get_state_history returns outer graph checkpoints history = [c async for c in app.aget_state_history(config)] - assert history == [ + expected_history = [ StateSnapshot( values={"my_key": "hi my value"}, tasks=( @@ -5169,13 +5439,17 @@ def outer_2(state: State): "thread_id": "1", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), ), StateSnapshot( values={"my_key": "my value"}, @@ -5240,11 +5514,17 @@ def outer_2(state: State): parent_config=None, ), ] + + if "shallow" in checkpointer_name: + expected_history = expected_history[:1] + + assert history == expected_history + # get_state_history for a subgraph returns its checkpoints child_history = [ c async for c in app.aget_state_history(history[0].tasks[0].state) ] - assert child_history == [ + expected_child_history = [ StateSnapshot( values={"my_key": "hi my value here", "my_other_key": "hi my value"}, next=("inner_2",), @@ -5277,16 +5557,20 @@ def outer_2(state: State): "langgraph_checkpoint_ns": AnyStr("inner:"), }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": AnyStr("inner:"), - "checkpoint_id": AnyStr(), - "checkpoint_map": AnyDict( - {"": AnyStr(), AnyStr("child:"): AnyStr()} - ), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": AnyStr("inner:"), + "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), + } } - }, + ), tasks=(PregelTask(AnyStr(), "inner_2", (PULL, "inner_2")),), ), StateSnapshot( @@ -5377,6 +5661,11 @@ def outer_2(state: State): ), ] + if "shallow" in checkpointer_name: + expected_child_history = expected_child_history[:1] + + assert child_history == expected_child_history + # resume await app.ainvoke(None, config, debug=True) # test state w/ nested subgraph state (after resuming from interrupt) @@ -5401,13 +5690,17 @@ def outer_2(state: State): "thread_id": "1", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), ) # test full history at the end actual_history = [c async for c in app.aget_state_history(config)] @@ -5435,13 +5728,17 @@ def outer_2(state: State): "thread_id": "1", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), ), StateSnapshot( values={"my_key": "hi my value here and there"}, @@ -5580,6 +5877,9 @@ def outer_2(state: State): parent_config=None, ), ] + if "shallow" in checkpointer_name: + expected_history = expected_history[:1] + assert actual_history == expected_history # test looking up parent state by checkpoint ID for actual_snapshot, expected_snapshot in zip(actual_history, expected_history): @@ -5683,13 +5983,17 @@ def parent_2(state: State): "thread_id": "1", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), ) child_state = await app.aget_state(outer_state.tasks[0].state) assert ( @@ -5725,13 +6029,17 @@ def parent_2(state: State): "thread_id": "1", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": AnyStr("child:"), - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": AnyStr("child:"), + "checkpoint_id": AnyStr(), + } } - }, + ), ).tasks[0] ) grandchild_state = await app.aget_state(child_state.tasks[0].state) @@ -5778,20 +6086,24 @@ def parent_2(state: State): "langgraph_triggers": [AnyStr("start:child_1")], }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": AnyStr(), - "checkpoint_id": AnyStr(), - "checkpoint_map": AnyDict( - { - "": AnyStr(), - AnyStr("child:"): AnyStr(), - AnyStr(re.compile(r"child:.+|child1:")): AnyStr(), - } - ), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": AnyStr(), + "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr(re.compile(r"child:.+|child1:")): AnyStr(), + } + ), + } } - }, + ), ) # get state with subgraphs assert await app.aget_state(config, subgraphs=True) == StateSnapshot( @@ -5860,22 +6172,28 @@ def parent_2(state: State): "langgraph_triggers": [AnyStr("start:child_1")], }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": AnyStr(), - "checkpoint_id": AnyStr(), - "checkpoint_map": AnyDict( - { - "": AnyStr(), - AnyStr("child:"): AnyStr(), - AnyStr( - re.compile(r"child:.+|child1:") - ): AnyStr(), - } - ), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": AnyStr(), + "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("child:"): AnyStr(), + AnyStr( + re.compile( + r"child:.+|child1:" + ) + ): AnyStr(), + } + ), + } } - }, + ), ), ), ), @@ -5904,16 +6222,20 @@ def parent_2(state: State): "langgraph_checkpoint_ns": AnyStr("child:"), }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": AnyStr("child:"), - "checkpoint_id": AnyStr(), - "checkpoint_map": AnyDict( - {"": AnyStr(), AnyStr("child:"): AnyStr()} - ), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": AnyStr("child:"), + "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + {"": AnyStr(), AnyStr("child:"): AnyStr()} + ), + } } - }, + ), ), ), ), @@ -5933,13 +6255,17 @@ def parent_2(state: State): "thread_id": "1", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), ) # resume assert [c async for c in app.astream(None, config, subgraphs=True)] == [ @@ -5981,15 +6307,23 @@ def parent_2(state: State): "thread_id": "1", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), ) ) + + if "shallow" in checkpointer_name: + return + # get outer graph history outer_history = [c async for c in app.aget_state_history(config)] assert ( @@ -6988,13 +7322,17 @@ def get_first_in_list(): "thread_id": "1", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), tasks=( PregelTask( id=AnyStr(), @@ -7080,13 +7418,17 @@ def get_first_in_list(): "thread_id": "14", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "14", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "14", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), tasks=( PregelTask( id=AnyStr(), @@ -7129,19 +7471,23 @@ def get_first_in_list(): "langgraph_checkpoint_ns": AnyStr("weather_graph:"), }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "14", - "checkpoint_ns": AnyStr("weather_graph:"), - "checkpoint_id": AnyStr(), - "checkpoint_map": AnyDict( - { - "": AnyStr(), - AnyStr("weather_graph:"): AnyStr(), - } - ), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "14", + "checkpoint_ns": AnyStr("weather_graph:"), + "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("weather_graph:"): AnyStr(), + } + ), + } } - }, + ), tasks=( PregelTask( id=AnyStr(), @@ -7180,13 +7526,17 @@ def get_first_in_list(): "thread_id": "14", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "14", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "14", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), tasks=( PregelTask( id=AnyStr(), @@ -7237,19 +7587,23 @@ def get_first_in_list(): "langgraph_checkpoint_ns": AnyStr("weather_graph:"), }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "14", - "checkpoint_ns": AnyStr("weather_graph:"), - "checkpoint_id": AnyStr(), - "checkpoint_map": AnyDict( - { - "": AnyStr(), - AnyStr("weather_graph:"): AnyStr(), - } - ), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "14", + "checkpoint_ns": AnyStr("weather_graph:"), + "checkpoint_id": AnyStr(), + "checkpoint_map": AnyDict( + { + "": AnyStr(), + AnyStr("weather_graph:"): AnyStr(), + } + ), + } } - }, + ), tasks=(), ), ), diff --git a/libs/langgraph/tests/test_pregel.py b/libs/langgraph/tests/test_pregel.py index 185c57765..b08cc87a7 100644 --- a/libs/langgraph/tests/test_pregel.py +++ b/libs/langgraph/tests/test_pregel.py @@ -4660,11 +4660,6 @@ class CustomParentState(TypedDict): ], "user_name": "Meow", } - expected_parent_config = ( - None - if "shallow" in checkpointer_name - else list(graph.checkpointer.list(config, limit=2))[-1].config - ) assert graph.get_state(config) == StateSnapshot( values={ "messages": [ @@ -4694,7 +4689,17 @@ class CustomParentState(TypedDict): "parents": {}, }, created_at=AnyStr(), - parent_config=expected_parent_config, + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } + } + ), tasks=(), ) diff --git a/libs/langgraph/tests/test_pregel_async.py b/libs/langgraph/tests/test_pregel_async.py index 45f1748fb..dffaaef57 100644 --- a/libs/langgraph/tests/test_pregel_async.py +++ b/libs/langgraph/tests/test_pregel_async.py @@ -75,6 +75,7 @@ ALL_CHECKPOINTERS_ASYNC, ALL_CHECKPOINTERS_ASYNC_PLUS_NONE, ALL_STORES_ASYNC, + REGULAR_CHECKPOINTERS_ASYNC, SHOULD_CHECK_SNAPSHOTS, awith_checkpointer, awith_store, @@ -347,22 +348,23 @@ async def tool_two_node(s: State) -> State: ) }, ] - assert [c.metadata async for c in tool_two.checkpointer.alist(thread1)] == [ - { - "parents": {}, - "source": "loop", - "step": 0, - "writes": None, - "thread_id": "1", - }, - { - "parents": {}, - "source": "input", - "step": -1, - "writes": {"__start__": {"my_key": "value ⛰️", "market": "DE"}}, - "thread_id": "1", - }, - ] + if "shallow" not in checkpointer_name: + assert [c.metadata async for c in tool_two.checkpointer.alist(thread1)] == [ + { + "parents": {}, + "source": "loop", + "step": 0, + "writes": None, + "thread_id": "1", + }, + { + "parents": {}, + "source": "input", + "step": -1, + "writes": {"__start__": {"my_key": "value ⛰️", "market": "DE"}}, + "thread_id": "1", + }, + ] tup = await tool_two.checkpointer.aget_tuple(thread1) assert await tool_two.aget_state(thread1) == StateSnapshot( values={"my_key": "value ⛰️", "market": "DE"}, @@ -390,9 +392,13 @@ async def tool_two_node(s: State) -> State: "writes": None, "thread_id": "1", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread1, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread1, limit=2)][ + -1 + ].config + ), ) # clear the interrupt and next tasks @@ -412,9 +418,13 @@ async def tool_two_node(s: State) -> State: "writes": {}, "thread_id": "1", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread1, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [c async for c in tool_two.checkpointer.alist(thread1, limit=2)][ + -1 + ].config + ), ) @@ -524,22 +534,25 @@ class State(TypedDict): ) }, ] - assert [c.metadata async for c in tool_two.checkpointer.alist(thread1root)] == [ - { - "parents": {}, - "source": "loop", - "step": 0, - "writes": None, - "thread_id": "1", - }, - { - "parents": {}, - "source": "input", - "step": -1, - "writes": {"__start__": {"my_key": "value ⛰️", "market": "DE"}}, - "thread_id": "1", - }, - ] + if "shallow" not in checkpointer_name: + assert [ + c.metadata async for c in tool_two.checkpointer.alist(thread1root) + ] == [ + { + "parents": {}, + "source": "loop", + "step": 0, + "writes": None, + "thread_id": "1", + }, + { + "parents": {}, + "source": "input", + "step": -1, + "writes": {"__start__": {"my_key": "value ⛰️", "market": "DE"}}, + "thread_id": "1", + }, + ] tup = await tool_two.checkpointer.aget_tuple(thread1) assert await tool_two.aget_state(thread1) == StateSnapshot( values={"my_key": "value ⛰️", "market": "DE"}, @@ -573,9 +586,13 @@ class State(TypedDict): "writes": None, "thread_id": "1", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread1root, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in tool_two.checkpointer.alist(thread1root, limit=2) + ][-1].config + ), ) # clear the interrupt and next tasks @@ -595,9 +612,13 @@ class State(TypedDict): "writes": {}, "thread_id": "1", }, - parent_config=[ - c async for c in tool_two.checkpointer.alist(thread1root, limit=2) - ][-1].config, + parent_config=( + None + if "shallow" in checkpointer_name + else [ + c async for c in tool_two.checkpointer.alist(thread1root, limit=2) + ][-1].config + ), ) @@ -1754,6 +1775,10 @@ def reset(self): # both the pending write and the new write were applied, 1 + 2 + 3 = 6 assert await graph.ainvoke(None, thread1) == {"value": 6} + if "shallow" in checkpointer_name: + assert len([c async for c in checkpointer.alist(thread1)]) == 1 + return + # check all final checkpoints checkpoints = [c async for c in checkpointer.alist(thread1)] # we should have 3 @@ -1911,7 +1936,7 @@ def reset(self): ) -@pytest.mark.parametrize("checkpointer_name", ALL_CHECKPOINTERS_ASYNC) +@pytest.mark.parametrize("checkpointer_name", REGULAR_CHECKPOINTERS_ASYNC) async def test_run_from_checkpoint_id_retains_previous_writes( request: pytest.FixtureRequest, checkpointer_name: str, mocker: MockerFixture ) -> None: @@ -3260,13 +3285,17 @@ async def foo(call: ToolCall): "thread_id": "2", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "2", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "2", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), tasks=( PregelTask( id=AnyStr(), @@ -3343,13 +3372,17 @@ async def foo(call: ToolCall): "thread_id": "2", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "2", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "2", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), tasks=(), ) @@ -3572,6 +3605,9 @@ def raise_if_above_10(input: int) -> int: assert state.values.get("total") == 5 assert state.next == () + if "shallow" in checkpointer_name: + return + assert len([c async for c in app.aget_state_history(thread_1, limit=1)]) == 1 # list all checkpoints for thread 1 thread_1_history = [c async for c in app.aget_state_history(thread_1)] @@ -4279,13 +4315,17 @@ async def decider(data: State) -> str: "thread_id": "1", }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), ) async with assert_ctx_once(): @@ -5758,13 +5798,17 @@ class CustomParentState(TypedDict): "parents": {}, }, created_at=AnyStr(), - parent_config={ - "configurable": { - "thread_id": "1", - "checkpoint_ns": "", - "checkpoint_id": AnyStr(), + parent_config=( + None + if "shallow" in checkpointer_name + else { + "configurable": { + "thread_id": "1", + "checkpoint_ns": "", + "checkpoint_id": AnyStr(), + } } - }, + ), tasks=(), ) @@ -6281,6 +6325,9 @@ async def second_node(state: State): result = await graph.ainvoke({"steps": [], "attempt": 2}, config) assert result == {"steps": ["start", "node1", "node2"], "attempt": 2} + if "shallow" in checkpointer_name: + return + # Verify checkpoint history shows both attempts history = [c async for c in graph.aget_state_history(config)] assert len(history) == 6 # Initial + failed attempt + successful attempt