Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add namespace listing #1889

Merged
merged 12 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions libs/checkpoint/langgraph/checkpoint/serde/jsonplus.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import dataclasses

Check notice on line 1 in libs/checkpoint/langgraph/checkpoint/serde/jsonplus.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... fanout_to_subgraph_10x: Mean +- std dev: 60.5 ms +- 2.0 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (7.30 ms) is 13% of the mean (57.7 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. fanout_to_subgraph_10x_sync: Mean +- std dev: 57.7 ms +- 7.3 ms ......................................... fanout_to_subgraph_10x_checkpoint: Mean +- std dev: 77.7 ms +- 1.2 ms ......................................... fanout_to_subgraph_10x_checkpoint_sync: Mean +- std dev: 83.1 ms +- 1.6 ms ......................................... fanout_to_subgraph_100x: Mean +- std dev: 577 ms +- 16 ms ......................................... fanout_to_subgraph_100x_sync: Mean +- std dev: 510 ms +- 5 ms ......................................... fanout_to_subgraph_100x_checkpoint: Mean +- std dev: 808 ms +- 34 ms ......................................... fanout_to_subgraph_100x_checkpoint_sync: Mean +- std dev: 802 ms +- 8 ms ......................................... react_agent_10x: Mean +- std dev: 42.7 ms +- 4.2 ms ......................................... react_agent_10x_sync: Mean +- std dev: 30.2 ms +- 0.3 ms ......................................... react_agent_10x_checkpoint: Mean +- std dev: 53.9 ms +- 1.4 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (4.48 ms) is 10% of the mean (44.2 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. react_agent_10x_checkpoint_sync: Mean +- std dev: 44.2 ms +- 4.5 ms ......................................... react_agent_100x: Mean +- std dev: 420 ms +- 9 ms ......................................... react_agent_100x_sync: Mean +- std dev: 336 ms +- 4 ms ......................................... react_agent_100x_checkpoint: Mean +- std dev: 971 ms +- 19 ms ......................................... react_agent_100x_checkpoint_sync: Mean +- std dev: 871 ms +- 21 ms ......................................... wide_state_25x300: Mean +- std dev: 20.7 ms +- 0.4 ms ......................................... wide_state_25x300_sync: Mean +- std dev: 12.9 ms +- 0.2 ms ......................................... wide_state_25x300_checkpoint: Mean +- std dev: 240 ms +- 8 ms ......................................... wide_state_25x300_checkpoint_sync: Mean +- std dev: 238 ms +- 16 ms ......................................... wide_state_15x600: Mean +- std dev: 24.1 ms +- 0.5 ms ......................................... wide_state_15x600_sync: Mean +- std dev: 15.0 ms +- 0.2 ms ......................................... wide_state_15x600_checkpoint: Mean +- std dev: 425 ms +- 13 ms ......................................... wide_state_15x600_checkpoint_sync: Mean +- std dev: 426 ms +- 22 ms ......................................... wide_state_9x1200: Mean +- std dev: 24.0 ms +- 0.4 ms ......................................... wide_state_9x1200_sync: Mean +- std dev: 15.1 ms +- 0.2 ms ......................................... wide_state_9x1200_checkpoint: Mean +- std dev: 273 ms +- 8 ms ......................................... wide_state_9x1200_checkpoint_sync: Mean +- std dev: 275 ms +- 19 ms

Check notice on line 1 in libs/checkpoint/langgraph/checkpoint/serde/jsonplus.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------+---------+-----------------------+ | Benchmark | main | changes | +=========================================+=========+=======================+ | fanout_to_subgraph_10x_checkpoint | 77.3 ms | 77.7 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_25x300_sync | 12.8 ms | 12.9 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_9x1200 | 23.8 ms | 24.0 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_10x_sync | 29.8 ms | 30.2 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_10x_checkpoint | 53.1 ms | 53.9 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_sync | 331 ms | 336 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x | 59.6 ms | 60.5 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_9x1200_checkpoint | 269 ms | 273 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_checkpoint_sync | 420 ms | 426 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_sync | 502 ms | 510 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_25x300 | 20.4 ms | 20.7 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x_checkpoint_sync | 81.8 ms | 83.1 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_sync | 14.8 ms | 15.0 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_checkpoint_sync | 789 ms | 802 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_checkpoint | 418 ms | 425 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_100x | 412 ms | 420 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600 | 23.6 ms | 24.1 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x | 563 ms | 577 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_10x | 41.6 ms | 42.7 ms: 1.03x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_9x1200_sync | 14.8 ms | 15.1 ms: 1.03x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_9x1200_checkpoint_sync | 267 ms | 275 ms: 1.03x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_10x_checkpoint_sync | 42.9 ms | 44.2 ms: 1.03x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_checkpoint | 775 ms | 808 ms: 1.04x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint | 930 ms | 971 ms: 1.04x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint_sync | 828 ms | 871 ms: 1.05x slower | +---------------------------------------
import decimal
import importlib
import json
Expand Down Expand Up @@ -26,6 +26,7 @@

from langgraph.checkpoint.serde.base import SerializerProtocol
from langgraph.checkpoint.serde.types import SendProtocol
from langgraph.store.base import Item

LC_REVIVER = Reviver()

Expand Down Expand Up @@ -403,6 +404,18 @@
),
),
)
elif isinstance(obj, Item):
return msgpack.ExtType(
EXT_CONSTRUCTOR_KW_ARGS,
_msgpack_enc(
(
obj.__class__.__module__,
obj.__class__.__name__,
{k: getattr(obj, k) for k in obj.__slots__},
),
),
)

elif isinstance(obj, BaseException):
return repr(obj)
else:
Expand Down
264 changes: 207 additions & 57 deletions libs/checkpoint/langgraph/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,61 @@
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Iterable, NamedTuple, Optional, Union
from typing import Any, Iterable, Literal, NamedTuple, Optional, Union


@dataclass
class Item:
"""Represents a stored item with metadata."""

value: dict[str, Any]
"""The stored data as a dictionary.

Keys are filterable.
"""Represents a stored item with metadata.

Args:
value (dict[str, Any]): The stored data as a dictionary. Keys are filterable.
(str): Unique identifier within the namespace.
namespace (tuple[str, ...]): Hierarchical path defining the collection in which this document resides.
Represented as a tuple of strings, allowing for nested categorization.
For example: ("documents", 'user123')
created_at (datetime): Timestamp of item creation.
updated_at (datetime): Timestamp of last update.
"""

scores: dict[str, float]
"""Relevance scores for the item.

Keys can include built-in scores like 'recency' and 'relevance',
as well as any key present in the 'value' dictionary. This allows
for multi-dimensional scoring of items.
"""

id: str
"""Unique identifier within the namespace."""

namespace: tuple[str, ...]
"""Hierarchical path defining the collection in which this document resides.

Represented as a tuple of strings, allowing for nested categorization.
For example: ("documents", 'user123')
"""

created_at: datetime
"""Timestamp of item creation."""

updated_at: datetime
"""Timestamp of last update."""
__slots__ = ("value", "key", "namespace", "created_at", "updated_at")

last_accessed_at: datetime
"""Timestamp of last access."""
def __init__(
self,
*,
value: dict[str, Any],
key: str,
namespace: tuple[str, ...],
created_at: datetime,
updated_at: datetime,
):
self.value = value
self.key = key
self.namespace = tuple(namespace)
self.created_at = created_at
self.updated_at = updated_at

def __eq__(self, other: object) -> bool:
if not isinstance(other, Item):
return False
return (
self.value == other.value
and self.key == other.key
and self.namespace == other.namespace
and self.created_at == other.created_at
and self.updated_at == other.updated_at
)

def __hash__(self) -> int:
return hash((self.namespace, self.key))


class GetOp(NamedTuple):
"""Operation to retrieve an item by namespace and ID."""
"""Operation to retrieve an item by namespace and key."""

namespace: tuple[str, ...]
"""Hierarchical path for the item."""
id: str
key: str
"""Unique identifier within the namespace."""


Expand All @@ -80,7 +86,7 @@ class PutOp(NamedTuple):
For example: ("documents", "user123")
"""

id: str
key: str
"""Unique identifier for the document.

Should be distinct within its namespace.
Expand All @@ -97,8 +103,48 @@ class PutOp(NamedTuple):
"""


Op = Union[GetOp, SearchOp, PutOp]
Result = Union[Item, list[Item], None]
NameSpacePath = tuple[Union[str, Literal["*"]], ...]

NamespaceMatchType = Literal["prefix", "suffix"]


class MatchCondition(NamedTuple):
"""Represents a single match condition."""

match_type: NamespaceMatchType
path: NameSpacePath


class ListNamespacesOp(NamedTuple):
"""Operation to list namespaces with optional match conditions."""

match_conditions: Optional[tuple[MatchCondition, ...]] = None
"""A tuple of match conditions to apply to namespaces."""

max_depth: Optional[int] = None
"""Return namespaces up to this depth in the hierarchy."""

limit: int = 100
"""Maximum number of namespaces to return."""

offset: int = 0
"""Number of namespaces to skip before returning results."""


Op = Union[GetOp, SearchOp, PutOp, ListNamespacesOp]
Result = Union[Item, list[Item], list[tuple[str, ...]], None]


class InvalidNamespaceError(ValueError):
"""Provided namespace is invalid."""


def _validate_namespace(namespace: tuple[str, ...]) -> None:
for label in namespace:
if "." in label:
raise InvalidNamespaceError(
f"Invalid namespace label '{label}'. Namespace labels cannot contain periods ('.')."
)


class BaseStore(ABC):
Expand All @@ -114,17 +160,17 @@ def batch(self, ops: Iterable[Op]) -> list[Result]:
async def abatch(self, ops: Iterable[Op]) -> list[Result]:
"""Execute a batch of operations asynchronously."""

def get(self, namespace: tuple[str, ...], id: str) -> Optional[Item]:
def get(self, namespace: tuple[str, ...], key: str) -> Optional[Item]:
"""Retrieve a single item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.
key: Unique identifier within the namespace.

Returns:
The retrieved item or None if not found.
"""
return self.batch([GetOp(namespace, id)])[0]
return self.batch([GetOp(namespace, key)])[0]

def search(
self,
Expand All @@ -148,36 +194,88 @@ def search(
"""
return self.batch([SearchOp(namespace_prefix, filter, limit, offset)])[0]

def put(self, namespace: tuple[str, ...], id: str, value: dict[str, Any]) -> None:
def put(self, namespace: tuple[str, ...], key: str, value: dict[str, Any]) -> None:
"""Store or update an item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.
key: Unique identifier within the namespace.
value: Dictionary containing the item's data.
"""
self.batch([PutOp(namespace, id, value)])
_validate_namespace(namespace)
self.batch([PutOp(namespace, key, value)])

def delete(self, namespace: tuple[str, ...], id: str) -> None:
def delete(self, namespace: tuple[str, ...], key: str) -> None:
"""Delete an item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.
key: Unique identifier within the namespace.
"""
self.batch([PutOp(namespace, id, None)])
self.batch([PutOp(namespace, key, None)])

def list_namespaces(
self,
*,
prefix: Optional[NameSpacePath] = None,
suffix: Optional[NameSpacePath] = None,
max_depth: Optional[int] = None,
limit: int = 100,
offset: int = 0,
) -> list[tuple[str, ...]]:
"""List and filter namespaces in the store.

Used to explore the organization of data,
find specific collections, or navigate the namespace hierarchy.

Args:
prefix (Optional[Tuple[str, ...]]): Filter namespaces that start with this path.
suffix (Optional[Tuple[str, ...]]): Filter namespaces that end with this path.
max_depth (Optional[int]): Return namespaces up to this depth in the hierarchy.
Namespaces deeper than this level will be truncated to this depth.
limit (int): Maximum number of namespaces to return (default 100).
offset (int): Number of namespaces to skip for pagination (default 0).

async def aget(self, namespace: tuple[str, ...], id: str) -> Optional[Item]:
Returns:
List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.
Each tuple represents a full namespace path up to `max_depth`.

Examples:

Setting max_depth=3. Given the namespaces:
# ("a", "b", "c")
# ("a", "b", "d", "e")
# ("a", "b", "d", "i")
# ("a", "b", "f")
# ("a", "c", "f")
store.list_namespaces(prefix=("a", "b"), max_depth=3)
# [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
"""
match_conditions = []
if prefix:
match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
if suffix:
match_conditions.append(MatchCondition(match_type="suffix", path=suffix))

op = ListNamespacesOp(
match_conditions=tuple(match_conditions),
max_depth=max_depth,
limit=limit,
offset=offset,
)
return self.batch([op])[0]

async def aget(self, namespace: tuple[str, ...], key: str) -> Optional[Item]:
"""Asynchronously retrieve a single item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.
key: Unique identifier within the namespace.

Returns:
The retrieved item or None if not found.
"""
return (await self.abatch([GetOp(namespace, id)]))[0]
return (await self.abatch([GetOp(namespace, key)]))[0]

async def asearch(
self,
Expand All @@ -204,22 +302,74 @@ async def asearch(
]

async def aput(
self, namespace: tuple[str, ...], id: str, value: dict[str, Any]
self, namespace: tuple[str, ...], key: str, value: dict[str, Any]
) -> None:
"""Asynchronously store or update an item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.
key: Unique identifier within the namespace.
value: Dictionary containing the item's data.
"""
await self.abatch([PutOp(namespace, id, value)])
_validate_namespace(namespace)
await self.abatch([PutOp(namespace, key, value)])

async def adelete(self, namespace: tuple[str, ...], id: str) -> None:
async def adelete(self, namespace: tuple[str, ...], key: str) -> None:
"""Asynchronously delete an item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.
key: Unique identifier within the namespace.
"""
await self.abatch([PutOp(namespace, key, None)])

async def alist_namespaces(
self,
*,
prefix: Optional[NameSpacePath] = None,
suffix: Optional[NameSpacePath] = None,
max_depth: Optional[int] = None,
limit: int = 100,
offset: int = 0,
) -> list[tuple[str, ...]]:
"""List and filter namespaces in the store asynchronously.

Used to explore the organization of data,
find specific collections, or navigate the namespace hierarchy.

Args:
prefix (Optional[Tuple[str, ...]]): Filter namespaces that start with this path.
suffix (Optional[Tuple[str, ...]]): Filter namespaces that end with this path.
max_depth (Optional[int]): Return namespaces up to this depth in the hierarchy.
Namespaces deeper than this level will be truncated to this depth.
limit (int): Maximum number of namespaces to return (default 100).
offset (int): Number of namespaces to skip for pagination (default 0).

Returns:
List[Tuple[str, ...]]: A list of namespace tuples that match the criteria.
Each tuple represents a full namespace path up to `max_depth`.

Examples:

Setting max_depth=3. Given the namespaces:
# ("a", "b", "c")
# ("a", "b", "d", "e")
# ("a", "b", "d", "i")
# ("a", "b", "f")
# ("a", "c", "f")
await store.alist_namespaces(prefix=("a", "b"), max_depth=3)
# [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
"""
await self.abatch([PutOp(namespace, id, None)])
match_conditions = []
if prefix:
match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
if suffix:
match_conditions.append(MatchCondition(match_type="suffix", path=suffix))

op = ListNamespacesOp(
match_conditions=tuple(match_conditions),
max_depth=max_depth,
limit=limit,
offset=offset,
)
return (await self.abatch([op]))[0]
Loading
Loading