Skip to content

Commit

Permalink
Lint and fix
Browse files Browse the repository at this point in the history
  • Loading branch information
hinthornw committed Dec 6, 2024
1 parent 4406828 commit 054ca2f
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 131 deletions.
13 changes: 2 additions & 11 deletions libs/checkpoint-postgres/langgraph/store/postgres/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
)
from langgraph.store.base.batch import (
BatchedBaseStore,
AsyncBatchedBaseStore,
SyncBatchedBaseStore,
)
from langgraph.store.postgres.base import (
_PLACEHOLDER,
Expand All @@ -40,7 +38,7 @@
logger = logging.getLogger(__name__)


class AsyncPostgresStore(AsyncBatchedBaseStore, BasePostgresStore[_ainternal.Conn]):
class AsyncPostgresStore(BatchedBaseStore, BasePostgresStore[_ainternal.Conn]):
"""Asynchronous Postgres-backed store with optional vector search using pgvector.
!!! example "Examples"
Expand Down Expand Up @@ -160,15 +158,8 @@ async def abatch(self, ops: Iterable[Op]) -> list[Result]:

return results

# def batch(self, ops: Iterable[Op]) -> list[Result]:
# return asyncio.run_coroutine_threadsafe(self.abatch(ops), self.loop).result()
def batch(self, ops: Iterable[Op]) -> list[Result]:
futures = []
for op in ops:
fut = self._loop.create_future()
self._aqueue[fut] = op
futures.append(fut)
return [fut.result() for fut in asyncio.as_completed(futures)]
return asyncio.run_coroutine_threadsafe(self.abatch(ops), self.loop).result()

@classmethod
@asynccontextmanager
Expand Down
4 changes: 2 additions & 2 deletions libs/checkpoint-postgres/langgraph/store/postgres/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from langgraph.checkpoint.postgres import _ainternal as _ainternal
from langgraph.checkpoint.postgres import _internal as _pg_internal
from langgraph.store.base import (
BaseStore,
GetOp,
IndexConfig,
Item,
Expand All @@ -44,6 +43,7 @@
get_text_at_path,
tokenize_path,
)
from langgraph.store.base.batch import SyncBatchedBaseStore

if TYPE_CHECKING:
from langchain_core.embeddings import Embeddings
Expand Down Expand Up @@ -533,7 +533,7 @@ def _get_filter_condition(self, key: str, op: str, value: Any) -> tuple[str, lis
raise ValueError(f"Unsupported operator: {op}")


class PostgresStore(BaseStore, BasePostgresStore[_pg_internal.Conn]):
class PostgresStore(SyncBatchedBaseStore, BasePostgresStore[_pg_internal.Conn]):
"""Postgres-backed store with optional vector search using pgvector.
!!! example "Examples"
Expand Down
177 changes: 84 additions & 93 deletions libs/checkpoint-postgres/tests/test_async_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import uuid
from collections.abc import AsyncIterator
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
from typing import Any, Optional

Expand Down Expand Up @@ -64,104 +65,94 @@ async def store(request) -> AsyncIterator[AsyncPostgresStore]:
await conn.execute(f"DROP DATABASE {database}")


async def test_large_batches(store: AsyncPostgresStore) -> None:
N = 100
def test_large_batches(store: AsyncPostgresStore) -> None:
N = 1000
M = 10
coros = []
from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=10) as executor:
for m in range(M):
ops = []
for i in range(N):
for i in range(N):
coros.append(
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
)
)
coros.append(
executor.submit(
store.get,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
)
)
coros.append(
executor.submit(
store.list_namespaces,
prefix=None,
max_depth=m + 1,
)
)
coros.append(
executor.submit(
store.search,
("test",),
)
)
coros.append(
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
)
)
coros.append(
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
None,
)
)
# ops.extend(
# [
# PutOp(
# namespace=("test", "foo", "bar", "baz", str(m % 2)),
# key=f"key{i}", # {m}",
# value=None,
# ),
# GetOp(namespace=("test",), key=f"key{i}{m}"),
# ListNamespacesOp(
# match_conditions=None, max_depth=i + 1, limit=m, offset=0
# ),
# SearchOp(
# namespace_prefix=("test",),
# filter=None,
# limit=10,
# offset=0,
# ),
# ]
#
# ops.extend(
# [
# # PutOp(
# # namespace=("test", "foo", "bar", "baz", str(m % 2)),
# # key=f"key{i}", # {m}",
# # value={"data": f"value{i}{m}"},
# # ),
# # GetOp(namespace=("test",), key=f"key{i}{m}"),
# # ListNamespacesOp(
# # match_conditions=None, max_depth=i + 1, limit=m, offset=0
# # ),
# # SearchOp(
# # namespace_prefix=("test",),
# # filter={"data": f"value{i}{m}"},
# # limit=10,
# # offset=0,
# # ),
# ]
# )

# coros.extend(ops)
# executor.map(store.batch, [[op] for op in coros])

# await asyncio.gather(*coros) # *[store.abatch(ops) for ops in coros])
_ = [
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
),
executor.submit(
store.get,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
),
executor.submit(
store.list_namespaces,
prefix=None,
max_depth=m + 1,
),
executor.submit(
store.search,
("test",),
),
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
),
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
None,
),
]


async def test_large_batches_async(store: AsyncPostgresStore) -> None:
N = 1000
M = 10
coros = []
for m in range(M):
for i in range(N):
coros.append(
store.aput(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
)
)
coros.append(
store.aget(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
)
)
coros.append(
store.alist_namespaces(
prefix=None,
max_depth=m + 1,
)
)
coros.append(
store.asearch(
("test",),
)
)
coros.append(
store.aput(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
)
)
coros.append(
store.adelete(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
)
)

await asyncio.gather(*coros)


async def test_abatch_order(store: AsyncPostgresStore) -> None:
Expand Down
98 changes: 93 additions & 5 deletions libs/checkpoint-postgres/tests/test_store.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# type: ignore

import asyncio
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from typing import Any, Optional
from uuid import uuid4
Expand All @@ -17,11 +19,7 @@
SearchOp,
)
from langgraph.store.postgres import PostgresStore
from tests.conftest import (
DEFAULT_URI,
VECTOR_TYPES,
CharacterEmbeddings,
)
from tests.conftest import DEFAULT_URI, VECTOR_TYPES, CharacterEmbeddings


@pytest.fixture(scope="function", params=["default", "pipe", "pool"])
Expand Down Expand Up @@ -59,6 +57,96 @@ def store(request) -> PostgresStore:
conn.execute(f"DROP DATABASE {database}")


def test_large_batches(store: PostgresStore) -> None:
N = 1000
M = 10

with ThreadPoolExecutor(max_workers=10) as executor:
for m in range(M):
for i in range(N):
_ = [
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
),
executor.submit(
store.get,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
),
executor.submit(
store.list_namespaces,
prefix=None,
max_depth=m + 1,
),
executor.submit(
store.search,
("test",),
),
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
),
executor.submit(
store.put,
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
None,
),
]


async def test_large_batches_async(store: PostgresStore) -> None:
N = 1000
M = 10
coros = []
for m in range(M):
for i in range(N):
coros.append(
store.aput(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
)
)
coros.append(
store.aget(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
)
)
coros.append(
store.alist_namespaces(
prefix=None,
max_depth=m + 1,
)
)
coros.append(
store.asearch(
("test",),
)
)
coros.append(
store.aput(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
value={"foo": "bar" + str(i)},
)
)
coros.append(
store.adelete(
("test", "foo", "bar", "baz", str(m % 2)),
f"key{i}",
)
)

await asyncio.gather(*coros)


def test_batch_order(store: PostgresStore) -> None:
# Setup test data
store.put(("test", "foo"), "key1", {"data": "value1"})
Expand Down
Loading

0 comments on commit 054ca2f

Please sign in to comment.