Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

donotmerge #1865

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
90bd12a
Implement new Store interface
nfcampos Sep 25, 2024
a72afdc
Fix test
nfcampos Sep 25, 2024
595c5ea
Lint
nfcampos Sep 25, 2024
cab8a24
Spelling
hinthornw Sep 25, 2024
d449ab5
Support stringized annotations
hinthornw Sep 25, 2024
6b1bf06
String Match
hinthornw Sep 25, 2024
fd226c6
Merge pull request #1835 from langchain-ai/wfh/other_annos
nfcampos Sep 25, 2024
3bf4270
Inject store
nfcampos Sep 25, 2024
0a9cd3a
Merge BaseStore and Store
nfcampos Sep 25, 2024
4ab823f
Slots for MemoryStore
nfcampos Sep 25, 2024
ebdc989
Add tests for MemoryStore in graph
hinthornw Sep 26, 2024
6af537b
Fixes
nfcampos Sep 26, 2024
c056300
Remove unimplemented args
nfcampos Sep 26, 2024
3ee8758
Update test signature
hinthornw Sep 26, 2024
7225cc2
Lint
nfcampos Sep 26, 2024
615b06b
Remove
nfcampos Sep 26, 2024
22b7cf4
Merge branch 'wfh/add_test' of https://github.com/langchain-ai/langgr…
bracesproul Sep 26, 2024
f97db1b
Update etst
hinthornw Sep 26, 2024
2b05c41
Merge branch 'wfh/add_test' of https://github.com/langchain-ai/langgr…
bracesproul Sep 26, 2024
944676a
Update kafka test
hinthornw Sep 26, 2024
a6b4c0c
Mv to checkpoint lib
hinthornw Sep 26, 2024
ba7f706
Move to checkpoint package
hinthornw Sep 26, 2024
4664b59
Merge branch 'wfh/add_test' of https://github.com/langchain-ai/langgr…
bracesproul Sep 26, 2024
99a88eb
Lint fixes
hinthornw Sep 26, 2024
d309f58
py.typed
hinthornw Sep 26, 2024
3daae2d
Update
hinthornw Sep 26, 2024
b9cf088
doc
hinthornw Sep 26, 2024
5c07393
Merge branch 'wfh/add_test' of https://github.com/langchain-ai/langgr…
bracesproul Sep 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 225 additions & 0 deletions libs/checkpoint/langgraph/store/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
"""Base classes and types for persistent key-value stores.

Check notice on line 1 in libs/checkpoint/langgraph/store/base.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... fanout_to_subgraph_10x: Mean +- std dev: 59.4 ms +- 1.3 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (6.69 ms) is 12% of the mean (56.9 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. fanout_to_subgraph_10x_sync: Mean +- std dev: 56.9 ms +- 6.7 ms ......................................... fanout_to_subgraph_10x_checkpoint: Mean +- std dev: 77.6 ms +- 1.6 ms ......................................... fanout_to_subgraph_10x_checkpoint_sync: Mean +- std dev: 81.7 ms +- 1.2 ms ......................................... fanout_to_subgraph_100x: Mean +- std dev: 551 ms +- 7 ms ......................................... fanout_to_subgraph_100x_sync: Mean +- std dev: 503 ms +- 5 ms ......................................... fanout_to_subgraph_100x_checkpoint: Mean +- std dev: 771 ms +- 23 ms ......................................... fanout_to_subgraph_100x_checkpoint_sync: Mean +- std dev: 791 ms +- 7 ms ......................................... react_agent_10x: Mean +- std dev: 41.6 ms +- 3.5 ms ......................................... react_agent_10x_sync: Mean +- std dev: 29.8 ms +- 0.4 ms ......................................... react_agent_10x_checkpoint: Mean +- std dev: 53.4 ms +- 1.3 ms ......................................... react_agent_10x_checkpoint_sync: Mean +- std dev: 42.9 ms +- 3.3 ms ......................................... react_agent_100x: Mean +- std dev: 413 ms +- 7 ms ......................................... react_agent_100x_sync: Mean +- std dev: 330 ms +- 4 ms ......................................... react_agent_100x_checkpoint: Mean +- std dev: 926 ms +- 13 ms ......................................... react_agent_100x_checkpoint_sync: Mean +- std dev: 822 ms +- 10 ms ......................................... wide_state_25x300: Mean +- std dev: 20.6 ms +- 0.4 ms ......................................... wide_state_25x300_sync: Mean +- std dev: 12.7 ms +- 0.1 ms ......................................... wide_state_25x300_checkpoint: Mean +- std dev: 237 ms +- 6 ms ......................................... wide_state_25x300_checkpoint_sync: Mean +- std dev: 237 ms +- 14 ms ......................................... wide_state_15x600: Mean +- std dev: 23.8 ms +- 0.5 ms ......................................... wide_state_15x600_sync: Mean +- std dev: 14.8 ms +- 0.2 ms ......................................... wide_state_15x600_checkpoint: Mean +- std dev: 417 ms +- 12 ms ......................................... wide_state_15x600_checkpoint_sync: Mean +- std dev: 419 ms +- 19 ms ......................................... wide_state_9x1200: Mean +- std dev: 23.7 ms +- 0.4 ms ......................................... wide_state_9x1200_sync: Mean +- std dev: 14.7 ms +- 0.4 ms ......................................... wide_state_9x1200_checkpoint: Mean +- std dev: 269 ms +- 10 ms ......................................... wide_state_9x1200_checkpoint_sync: Mean +- std dev: 266 ms +- 17 ms

Check notice on line 1 in libs/checkpoint/langgraph/store/base.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+-----------------------------------------+---------+-----------------------+ | Benchmark | main | changes | +=========================================+=========+=======================+ | react_agent_10x_sync | 29.9 ms | 29.8 ms: 1.00x faster | +-----------------------------------------+---------+-----------------------+ | wide_state_25x300_sync | 12.8 ms | 12.7 ms: 1.00x faster | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_checkpoint_sync | 788 ms | 791 ms: 1.00x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600 | 23.7 ms | 23.8 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_25x300 | 20.4 ms | 20.6 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_9x1200_checkpoint | 266 ms | 269 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint_sync | 814 ms | 822 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | wide_state_15x600_checkpoint_sync | 414 ms | 419 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x | 544 ms | 551 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_10x_checkpoint | 52.8 ms | 53.4 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_10x_checkpoint | 76.5 ms | 77.6 ms: 1.01x slower | +-----------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint | 911 ms | 926 ms: 1.02x slower | +-----------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_checkpoint | 752 ms | 771 ms: 1.03x slower | +-----------------------------------------+---------+-----------------------+ | Geometric mean | (ref) | 1.01x slower | +-----------------------------------------+---------+-----------------------+ Benchmark hidden because not significant (15): wide_state_25x300_checkpoint, react_agent_100x_sync, wide_state_25x300_checkpoint_sync, wide_state_15x600_sync, wide_state_9x1200_sync, wide_state_9x1200, fanout_to_subgraph_100x_sync, fanout_to_subgraph_10x, fanout_to_subgraph_10x_checkpoint_sync, react_agent_100x, wide_state_15x600_checkpoint, react_agent_10x_checkpoint_sync, react_agent_10x, wide_state_9x1200_checkpoint_sync, fanout_to_subgraph_10x_sync

Stores enable persistence and memory that can be shared across threads,
scoped to user IDs, assistant IDs, or other arbitrary namespaces.
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Iterable, NamedTuple, Optional, Union


@dataclass
class Item:
"""Represents a stored item with metadata."""

value: dict[str, Any]
"""The stored data as a dictionary.

Keys are filterable.
"""

scores: dict[str, float]
"""Relevance scores for the item.

Keys can include built-in scores like 'recency' and 'relevance',
as well as any key present in the 'value' dictionary. This allows
for multi-dimensional scoring of items.
"""

id: str
"""Unique identifier within the namespace."""

namespace: tuple[str, ...]
"""Hierarchical path defining the collection in which this document resides.

Represented as a tuple of strings, allowing for nested categorization.
For example: ("documents", 'user123')
"""

created_at: datetime
"""Timestamp of item creation."""

updated_at: datetime
"""Timestamp of last update."""

last_accessed_at: datetime
"""Timestamp of last access."""


class GetOp(NamedTuple):
"""Operation to retrieve an item by namespace and ID."""

namespace: tuple[str, ...]
"""Hierarchical path for the item."""
id: str
"""Unique identifier within the namespace."""


class SearchOp(NamedTuple):
"""Operation to search for items within a namespace prefix."""

namespace_prefix: tuple[str, ...]
"""Hierarchical path prefix to search within."""
filter: Optional[dict[str, Any]] = None
"""Key-value pairs to filter results."""
limit: int = 10
"""Maximum number of items to return."""
offset: int = 0
"""Number of items to skip before returning results."""


class PutOp(NamedTuple):
"""Operation to store, update, or delete an item."""

namespace: tuple[str, ...]
"""Hierarchical path for the item.

Represented as a tuple of strings, allowing for nested categorization.
For example: ("documents", "user123")
"""

id: str
"""Unique identifier for the document.

Should be distinct within its namespace.
"""

value: Optional[dict[str, Any]]
"""Data to be stored, or None to delete the item.

Schema:
- Should be a dictionary where:
- Keys are strings representing field names
- Values can be of any serializable type
- If None, it indicates that the item should be deleted
"""


Op = Union[GetOp, SearchOp, PutOp]
Result = Union[Item, list[Item], None]


class BaseStore(ABC):
"""Abstract base class for key-value stores."""

__slots__ = ("__weakref__",)

@abstractmethod
def batch(self, ops: Iterable[Op]) -> list[Result]:
"""Execute a batch of operations synchronously."""

@abstractmethod
async def abatch(self, ops: Iterable[Op]) -> list[Result]:
"""Execute a batch of operations asynchronously."""

def get(self, namespace: tuple[str, ...], id: str) -> Optional[Item]:
"""Retrieve a single item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.

Returns:
The retrieved item or None if not found.
"""
return self.batch([GetOp(namespace, id)])[0]

def search(
self,
namespace_prefix: tuple[str, ...],
/,
*,
filter: Optional[dict[str, Any]] = None,
limit: int = 10,
offset: int = 0,
) -> list[Item]:
"""Search for items within a namespace prefix.

Args:
namespace_prefix: Hierarchical path prefix to search within.
filter: Key-value pairs to filter results.
limit: Maximum number of items to return.
offset: Number of items to skip before returning results.

Returns:
List of items matching the search criteria.
"""
return self.batch([SearchOp(namespace_prefix, filter, limit, offset)])[0]

def put(self, namespace: tuple[str, ...], id: str, value: dict[str, Any]) -> None:
"""Store or update an item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.
value: Dictionary containing the item's data.
"""
self.batch([PutOp(namespace, id, value)])

def delete(self, namespace: tuple[str, ...], id: str) -> None:
"""Delete an item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.
"""
self.batch([PutOp(namespace, id, None)])

async def aget(self, namespace: tuple[str, ...], id: str) -> Optional[Item]:
"""Asynchronously retrieve a single item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.

Returns:
The retrieved item or None if not found.
"""
return (await self.abatch([GetOp(namespace, id)]))[0]

async def asearch(
self,
namespace_prefix: tuple[str, ...],
/,
*,
filter: Optional[dict[str, Any]] = None,
limit: int = 10,
offset: int = 0,
) -> list[Item]:
"""Asynchronously search for items within a namespace prefix.

Args:
namespace_prefix: Hierarchical path prefix to search within.
filter: Key-value pairs to filter results.
limit: Maximum number of items to return.
offset: Number of items to skip before returning results.

Returns:
List of items matching the search criteria.
"""
return (await self.abatch([SearchOp(namespace_prefix, filter, limit, offset)]))[
0
]

async def aput(
self, namespace: tuple[str, ...], id: str, value: dict[str, Any]
) -> None:
"""Asynchronously store or update an item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.
value: Dictionary containing the item's data.
"""
await self.abatch([PutOp(namespace, id, value)])

async def adelete(self, namespace: tuple[str, ...], id: str) -> None:
"""Asynchronously delete an item.

Args:
namespace: Hierarchical path for the item.
id: Unique identifier within the namespace.
"""
await self.abatch([PutOp(namespace, id, None)])
90 changes: 90 additions & 0 deletions libs/checkpoint/langgraph/store/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
import weakref
from typing import Any, Optional

from langgraph.store.base import BaseStore, GetOp, Item, Op, PutOp, SearchOp


class AsyncBatchedBaseStore(BaseStore):
"""Efficiently batch operations in a background task."""

__slots__ = ("_loop", "_aqueue", "_task")

def __init__(self) -> None:
self._loop = asyncio.get_running_loop()
self._aqueue: dict[asyncio.Future, Op] = {}
self._task = self._loop.create_task(_run(self._aqueue, weakref.ref(self)))

def __del__(self) -> None:
self._task.cancel()

async def aget(
self,
namespace: tuple[str, ...],
id: str,
) -> Optional[Item]:
fut = self._loop.create_future()
self._aqueue[fut] = GetOp(namespace, id)
return await fut

async def asearch(
self,
namespace_prefix: tuple[str, ...],
/,
*,
query: Optional[str] = None,
filter: Optional[dict[str, Any]] = None,
weights: Optional[dict[str, float]] = None,
limit: int = 10,
offset: int = 0,
) -> list[Item]:
fut = self._loop.create_future()
self._aqueue[fut] = SearchOp(namespace_prefix, filter, limit, offset)
return await fut

async def aput(
self,
namespace: tuple[str, ...],
id: str,
value: dict[str, Any],
) -> None:
fut = self._loop.create_future()
self._aqueue[fut] = PutOp(namespace, id, value)
return await fut

async def adelete(
self,
namespace: tuple[str, ...],
id: str,
) -> None:
fut = self._loop.create_future()
self._aqueue[fut] = PutOp(namespace, id, None)
return await fut


async def _run(
aqueue: dict[asyncio.Future, Op], store: weakref.ReferenceType[BaseStore]
) -> None:
while True:
await asyncio.sleep(0)
if not aqueue:
continue
if s := store():
# get the operations to run
taken = aqueue.copy()
# action each operation
try:
results = await s.abatch(taken.values())
# set the results of each operation
for fut, result in zip(taken, results):
fut.set_result(result)
except Exception as e:
for fut in taken:
fut.set_exception(e)
# remove the operations from the queue
for fut in taken:
del aqueue[fut]
else:
break
# remove strong ref to store
del s
68 changes: 68 additions & 0 deletions libs/checkpoint/langgraph/store/memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from collections import defaultdict
from datetime import datetime, timezone
from typing import Iterable

from langgraph.store.base import BaseStore, GetOp, Item, Op, PutOp, Result, SearchOp


class InMemoryStore(BaseStore):
"""A KV store backed by an in-memory python dictionary.

Useful for testing/experimentation and lightweight PoC's.
For actual persistence, use a Store backed by a proper database.
"""

__slots__ = ("_data",)

def __init__(self) -> None:
self._data: dict[tuple[str, ...], dict[str, Item]] = defaultdict(dict)

def batch(self, ops: Iterable[Op]) -> list[Result]:
results: list[Result] = []
for op in ops:
if isinstance(op, GetOp):
item = self._data[op.namespace].get(op.id)
if item is not None:
item.last_accessed_at = datetime.now(timezone.utc)
results.append(item)
elif isinstance(op, SearchOp):
candidates = [
item
for namespace, items in self._data.items()
if (
namespace[: len(op.namespace_prefix)] == op.namespace_prefix
if len(namespace) >= len(op.namespace_prefix)
else False
)
for item in items.values()
]
if op.filter:
candidates = [
item
for item in candidates
if item.value.items() >= op.filter.items()
]
results.append(candidates[op.offset : op.offset + op.limit])
elif isinstance(op, PutOp):
if op.value is None:
self._data[op.namespace].pop(op.id, None)
elif op.id in self._data[op.namespace]:
self._data[op.namespace][op.id].value = op.value
self._data[op.namespace][op.id].updated_at = datetime.now(
timezone.utc
)
else:
self._data[op.namespace][op.id] = Item(
value=op.value,
scores={},
id=op.id,
namespace=op.namespace,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
last_accessed_at=datetime.now(timezone.utc),
)
results.append(None)
return results

async def abatch(self, ops: Iterable[Op]) -> list[Result]:
return self.batch(ops)
Loading
Loading