Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wfh/scaling #2668

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions libs/checkpoint-postgres/langgraph/store/postgres/aio.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import asyncio

Check notice on line 1 in libs/checkpoint-postgres/langgraph/store/postgres/aio.py

View workflow job for this annotation

GitHub Actions / benchmark

Benchmark results

......................................... fanout_to_subgraph_10x: Mean +- std dev: 60.0 ms +- 1.2 ms ......................................... fanout_to_subgraph_10x_sync: Mean +- std dev: 51.0 ms +- 0.8 ms ......................................... fanout_to_subgraph_10x_checkpoint: Mean +- std dev: 92.5 ms +- 9.0 ms ......................................... fanout_to_subgraph_10x_checkpoint_sync: Mean +- std dev: 94.1 ms +- 1.0 ms ......................................... fanout_to_subgraph_100x: Mean +- std dev: 609 ms +- 32 ms ......................................... fanout_to_subgraph_100x_sync: Mean +- std dev: 496 ms +- 5 ms ......................................... fanout_to_subgraph_100x_checkpoint: Mean +- std dev: 963 ms +- 40 ms ......................................... fanout_to_subgraph_100x_checkpoint_sync: Mean +- std dev: 920 ms +- 20 ms ......................................... react_agent_10x: Mean +- std dev: 30.7 ms +- 0.6 ms ......................................... react_agent_10x_sync: Mean +- std dev: 22.4 ms +- 0.3 ms ......................................... react_agent_10x_checkpoint: Mean +- std dev: 46.7 ms +- 0.9 ms ......................................... react_agent_10x_checkpoint_sync: Mean +- std dev: 38.8 ms +- 3.7 ms ......................................... react_agent_100x: Mean +- std dev: 342 ms +- 6 ms ......................................... react_agent_100x_sync: Mean +- std dev: 273 ms +- 4 ms ......................................... react_agent_100x_checkpoint: Mean +- std dev: 951 ms +- 15 ms ......................................... react_agent_100x_checkpoint_sync: Mean +- std dev: 842 ms +- 13 ms ......................................... wide_state_25x300: Mean +- std dev: 23.2 ms +- 0.4 ms ......................................... WARNING: the benchmark result may be unstable * the standard deviation (1.96 ms) is 12% of the mean (15.8 ms) Try to rerun the benchmark with more runs, values and/or loops. Run 'python -m pyperf system tune' command to reduce the system jitter. Use pyperf stats, pyperf dump and pyperf hist to analyze results. Use --quiet option to hide these warnings. wide_state_25x300_sync: Mean +- std dev: 15.8 ms +- 2.0 ms ......................................... wide_state_25x300_checkpoint: Mean +- std dev: 290 ms +- 17 ms ......................................... wide_state_25x300_checkpoint_sync: Mean +- std dev: 275 ms +- 16 ms ......................................... wide_state_15x600: Mean +- std dev: 27.1 ms +- 0.5 ms ......................................... wide_state_15x600_sync: Mean +- std dev: 16.6 ms +- 0.2 ms ......................................... wide_state_15x600_checkpoint: Mean +- std dev: 489 ms +- 16 ms ......................................... wide_state_15x600_checkpoint_sync: Mean +- std dev: 473 ms +- 15 ms ......................................... wide_state_9x1200: Mean +- std dev: 27.0 ms +- 0.5 ms ......................................... wide_state_9x1200_sync: Mean +- std dev: 16.5 ms +- 0.3 ms ......................................... wide_state_9x1200_checkpoint: Mean +- std dev: 322 ms +- 15 ms ......................................... wide_state_9x1200_checkpoint_sync: Mean +- std dev: 307 ms +- 15 ms

Check notice on line 1 in libs/checkpoint-postgres/langgraph/store/postgres/aio.py

View workflow job for this annotation

GitHub Actions / benchmark

Comparison against main

+------------------------------------+---------+-----------------------+ | Benchmark | main | changes | +====================================+=========+=======================+ | react_agent_10x | 31.0 ms | 30.7 ms: 1.01x faster | +------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_sync | 494 ms | 496 ms: 1.01x slower | +------------------------------------+---------+-----------------------+ | react_agent_100x_sync | 271 ms | 273 ms: 1.01x slower | +------------------------------------+---------+-----------------------+ | react_agent_100x_checkpoint | 936 ms | 951 ms: 1.02x slower | +------------------------------------+---------+-----------------------+ | fanout_to_subgraph_100x_checkpoint | 941 ms | 963 ms: 1.02x slower | +------------------------------------+---------+-----------------------+ | Geometric mean | (ref) | 1.00x slower | +------------------------------------+---------+-----------------------+ Benchmark hidden because not significant (23): fanout_to_subgraph_100x, react_agent_10x_checkpoint_sync, fanout_to_subgraph_100x_checkpoint_sync, react_agent_100x, react_agent_100x_checkpoint_sync, react_agent_10x_sync, react_agent_10x_checkpoint, wide_state_15x600_checkpoint, wide_state_15x600, wide_state_25x300, wide_state_9x1200, wide_state_15x600_checkpoint_sync, wide_state_25x300_checkpoint_sync, wide_state_15x600_sync, fanout_to_subgraph_10x, fanout_to_subgraph_10x_checkpoint_sync, wide_state_9x1200_sync, fanout_to_subgraph_10x_sync, fanout_to_subgraph_10x_checkpoint, wide_state_9x1200_checkpoint_sync, wide_state_9x1200_checkpoint, wide_state_25x300_checkpoint, wide_state_25x300_sync
import logging
from collections.abc import AsyncIterator, Iterable, Sequence
from contextlib import asynccontextmanager
Expand All @@ -19,7 +19,9 @@
Result,
SearchOp,
)
from langgraph.store.base.batch import AsyncBatchedBaseStore
from langgraph.store.base.batch import (
BatchedBaseStore,
)
from langgraph.store.postgres.base import (
_PLACEHOLDER,
BasePostgresStore,
Expand All @@ -36,7 +38,7 @@
logger = logging.getLogger(__name__)


class AsyncPostgresStore(AsyncBatchedBaseStore, BasePostgresStore[_ainternal.Conn]):
class AsyncPostgresStore(BatchedBaseStore, BasePostgresStore[_ainternal.Conn]):
"""Asynchronous Postgres-backed store with optional vector search using pgvector.

!!! example "Examples"
Expand Down
4 changes: 2 additions & 2 deletions libs/checkpoint-postgres/langgraph/store/postgres/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from langgraph.checkpoint.postgres import _ainternal as _ainternal
from langgraph.checkpoint.postgres import _internal as _pg_internal
from langgraph.store.base import (
BaseStore,
GetOp,
IndexConfig,
Item,
Expand All @@ -44,6 +43,7 @@
get_text_at_path,
tokenize_path,
)
from langgraph.store.base.batch import SyncBatchedBaseStore

if TYPE_CHECKING:
from langchain_core.embeddings import Embeddings
Expand Down Expand Up @@ -533,7 +533,7 @@ def _get_filter_condition(self, key: str, op: str, value: Any) -> tuple[str, lis
raise ValueError(f"Unsupported operator: {op}")


class PostgresStore(BaseStore, BasePostgresStore[_pg_internal.Conn]):
class PostgresStore(SyncBatchedBaseStore, BasePostgresStore[_pg_internal.Conn]):
"""Postgres-backed store with optional vector search using pgvector.

!!! example "Examples"
Expand Down
92 changes: 92 additions & 0 deletions libs/checkpoint-postgres/tests/test_async_store.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# type: ignore
import asyncio
import itertools
import sys
import uuid
from collections.abc import AsyncIterator
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from typing import Any, Optional

Expand Down Expand Up @@ -63,6 +65,96 @@ async def store(request) -> AsyncIterator[AsyncPostgresStore]:
await conn.execute(f"DROP DATABASE {database}")


def test_large_batches(store: AsyncPostgresStore) -> None:
N = 1000
M = 10

with ThreadPoolExecutor(max_workers=10) as executor:
for m in range(M):
for i in range(N):
_ = [
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
),
executor.submit(
store.get,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
),
executor.submit(
store.list_namespaces,
prefix=None,
max_depth=m + 1,
),
executor.submit(
store.search,
("test",),
),
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
),
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
None,
),
]


async def test_large_batches_async(store: AsyncPostgresStore) -> None:
N = 1000
M = 10
coros = []
for m in range(M):
for i in range(N):
coros.append(
store.aput(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
)
)
coros.append(
store.aget(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
)
)
coros.append(
store.alist_namespaces(
prefix=None,
max_depth=m + 1,
)
)
coros.append(
store.asearch(
("test",),
)
)
coros.append(
store.aput(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
)
)
coros.append(
store.adelete(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
)
)

await asyncio.gather(*coros)


async def test_abatch_order(store: AsyncPostgresStore) -> None:
# Setup test data
await store.aput(("test", "foo"), "key1", {"data": "value1"})
Expand Down
98 changes: 93 additions & 5 deletions libs/checkpoint-postgres/tests/test_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# type: ignore

import asyncio
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import Any, Optional
from uuid import uuid4
Expand All @@ -17,11 +19,7 @@
SearchOp,
)
from langgraph.store.postgres import PostgresStore
from tests.conftest import (
DEFAULT_URI,
VECTOR_TYPES,
CharacterEmbeddings,
)
from tests.conftest import DEFAULT_URI, VECTOR_TYPES, CharacterEmbeddings


@pytest.fixture(scope="function", params=["default", "pipe", "pool"])
Expand Down Expand Up @@ -59,6 +57,96 @@ def store(request) -> PostgresStore:
conn.execute(f"DROP DATABASE {database}")


def test_large_batches(store: PostgresStore) -> None:
N = 1000
M = 10

with ThreadPoolExecutor(max_workers=10) as executor:
for m in range(M):
for i in range(N):
_ = [
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
),
executor.submit(
store.get,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
),
executor.submit(
store.list_namespaces,
prefix=None,
max_depth=m + 1,
),
executor.submit(
store.search,
("test",),
),
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
),
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
None,
),
]


async def test_large_batches_async(store: PostgresStore) -> None:
N = 1000
M = 10
coros = []
for m in range(M):
for i in range(N):
coros.append(
store.aput(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
)
)
coros.append(
store.aget(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
)
)
coros.append(
store.alist_namespaces(
prefix=None,
max_depth=m + 1,
)
)
coros.append(
store.asearch(
("test",),
)
)
coros.append(
store.aput(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
)
)
coros.append(
store.adelete(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
)
)

await asyncio.gather(*coros)


def test_batch_order(store: PostgresStore) -> None:
# Setup test data
store.put(("test", "foo"), "key1", {"data": "value1"})
Expand Down
4 changes: 4 additions & 0 deletions libs/checkpoint/langgraph/store/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,8 @@ def list_namespaces(
# [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
```
"""
if max_depth is not None and max_depth <= 0:
raise ValueError("If provided, max_depth must be greater than 0")
match_conditions = []
if prefix:
match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
Expand Down Expand Up @@ -1004,6 +1006,8 @@ async def alist_namespaces(
# Returns: [("a", "b", "c"), ("a", "b", "d"), ("a", "b", "f")]
```
"""
if max_depth is not None and max_depth <= 0:
raise ValueError("If provided, max_depth must be greater than 0")
match_conditions = []
if prefix:
match_conditions.append(MatchCondition(match_type="prefix", path=prefix))
Expand Down
Loading
Loading