diff --git a/libs/checkpoint/langgraph/store/base.py b/libs/checkpoint/langgraph/store/base.py index c4b6bd552f..c51eba14a0 100644 --- a/libs/checkpoint/langgraph/store/base.py +++ b/libs/checkpoint/langgraph/store/base.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, Iterable, Literal, NamedTuple, Optional, Union +from typing import Any, Iterable, Literal, NamedTuple, Optional, Union, cast class Item: @@ -35,9 +35,19 @@ def __init__( ): self.value = value self.key = key + # The casting from json-like types is for if this object is + # deserialized. self.namespace = tuple(namespace) - self.created_at = created_at - self.updated_at = updated_at + self.created_at = ( + datetime.fromisoformat(cast(str, created_at)) + if isinstance(created_at, str) + else created_at + ) + self.updated_at = ( + datetime.fromisoformat(cast(str, created_at)) + if isinstance(updated_at, str) + else updated_at + ) def __eq__(self, other: object) -> bool: if not isinstance(other, Item): @@ -53,6 +63,15 @@ def __eq__(self, other: object) -> bool: def __hash__(self) -> int: return hash((self.namespace, self.key)) + def dict(self) -> dict: + return { + "value": self.value, + "key": self.key, + "namespace": list(self.namespace), + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + } + class GetOp(NamedTuple): """Operation to retrieve an item by namespace and key.""" diff --git a/libs/sdk-py/langgraph_sdk/client.py b/libs/sdk-py/langgraph_sdk/client.py index 9125f8bb18..308a948e1c 100644 --- a/libs/sdk-py/langgraph_sdk/client.py +++ b/libs/sdk-py/langgraph_sdk/client.py @@ -11,6 +11,7 @@ Iterator, List, Optional, + Sequence, Union, overload, ) @@ -29,12 +30,15 @@ Cron, DisconnectMode, GraphSchema, + Item, Json, + ListNamespaceResponse, MultitaskStrategy, OnCompletionBehavior, OnConflictBehavior, Run, RunCreate, + SearchItemsResponse, StreamMode, StreamPart, Subgraphs, @@ -143,6 +147,7 @@ def __init__(self, client: httpx.AsyncClient) -> None: self.threads = ThreadsClient(self.http) self.runs = RunsClient(self.http) self.crons = CronClient(self.http) + self.store = StoreClient(self.http) class HttpClient: @@ -211,9 +216,9 @@ async def patch(self, path: str, *, json: dict) -> Any: raise e return await adecode_json(r) - async def delete(self, path: str) -> None: + async def delete(self, path: str, *, json: Optional[Any] = None) -> None: """Make a DELETE request.""" - r = await self.client.delete(path) + r = await self.client.request("DELETE", path, json=json) try: r.raise_for_status() except httpx.HTTPStatusError as e: @@ -1874,6 +1879,205 @@ async def search( return await self.http.post("/runs/crons/search", json=payload) +class StoreClient: + def __init__(self, http: HttpClient) -> None: + self.http = http + + async def put_item( + self, namespace: Sequence[str], /, key: str, value: dict[str, Any] + ) -> None: + """Store or update an item. + + Args: + namespace: A list of strings representing the namespace path. + key: The unique identifier for the item within the namespace. + value: A dictionary containing the item's data. + + Returns: + None + + Example Usage: + + await client.store.put_item( + ["documents", "user123"], + key="item456", + value={"title": "My Document", "content": "Hello World"} + ) + """ + for label in namespace: + if "." in label: + raise ValueError( + f"Invalid namespace label '{label}'. Namespace labels cannot contain periods ('.')." + ) + payload = { + "namespace": namespace, + "key": key, + "value": value, + } + await self.http.put("/store/items", json=payload) + + async def get_item(self, namespace: Sequence[str], /, key: str) -> Item: + """Retrieve a single item. + + Args: + key: The unique identifier for the item. + namespace: Optional list of strings representing the namespace path. + + Returns: + Item: The retrieved item. + + Example Usage: + + item = await client.store.get_item( + ["documents", "user123"], + key="item456", + ) + print(item) + + ---------------------------------------------------------------- + + { + 'namespace': ['documents', 'user123'], + 'key': 'item456', + 'value': {'title': 'My Document', 'content': 'Hello World'}, + 'created_at': '2024-07-30T12:00:00Z', + 'updated_at': '2024-07-30T12:00:00Z' + } + """ + for label in namespace: + if "." in label: + raise ValueError( + f"Invalid namespace label '{label}'. Namespace labels cannot contain periods ('.')." + ) + return await self.http.get( + "/store/items", params={"namespace": namespace, "key": key} + ) + + async def delete_item(self, namespace: Sequence[str], /, key: str) -> None: + """Delete an item. + + Args: + key: The unique identifier for the item. + namespace: Optional list of strings representing the namespace path. + + Returns: + None + + Example Usage: + + await client.store.delete_item( + ["documents", "user123"], + key="item456", + ) + """ + await self.http.delete( + "/store/items", json={"namespace": namespace, "key": key} + ) + + async def search_items( + self, + namespace_prefix: Sequence[str], + /, + filter: Optional[dict[str, Any]] = None, + limit: int = 10, + offset: int = 0, + ) -> SearchItemsResponse: + """Search for items within a namespace prefix. + + Args: + namespace_prefix: List of strings representing the namespace prefix. + filter: Optional dictionary of key-value pairs to filter results. + limit: Maximum number of items to return (default is 10). + offset: Number of items to skip before returning results (default is 0). + + Returns: + List[Item]: A list of items matching the search criteria. + + Example Usage: + + items = await client.store.search_items( + ["documents"], + filter={"author": "John Doe"}, + limit=5, + offset=0 + ) + print(items) + + ---------------------------------------------------------------- + + { + "items": [ + { + "namespace": ["documents", "user123"], + "key": "item789", + "value": { + "title": "Another Document", + "author": "John Doe" + }, + "created_at": "2024-07-30T12:00:00Z", + "updated_at": "2024-07-30T12:00:00Z" + }, + # ... additional items ... + ] + } + """ + payload = { + "namespace_prefix": namespace_prefix, + "filter": filter, + "limit": limit, + "offset": offset, + } + + return await self.http.post("/store/items/search", json=_provided_vals(payload)) + + async def list_namespaces( + self, + prefix: Optional[List[str]] = None, + suffix: Optional[List[str]] = None, + max_depth: Optional[int] = None, + limit: int = 100, + offset: int = 0, + ) -> ListNamespaceResponse: + """List namespaces with optional match conditions. + + Args: + prefix: Optional list of strings representing the prefix to filter namespaces. + suffix: Optional list of strings representing the suffix to filter namespaces. + max_depth: Optional integer specifying the maximum depth of namespaces to return. + limit: Maximum number of namespaces to return (default is 100). + offset: Number of namespaces to skip before returning results (default is 0). + + Returns: + List[List[str]]: A list of namespaces matching the criteria. + + Example Usage: + + namespaces = await client.store.list_namespaces( + prefix=["documents"], + max_depth=3, + limit=10, + offset=0 + ) + print(namespaces) + + ---------------------------------------------------------------- + + [ + ["documents", "user123", "reports"], + ["documents", "user456", "invoices"], + ... + ] + """ + payload = { + "prefix": prefix, + "suffix": suffix, + "max_depth": max_depth, + "limit": limit, + "offset": offset, + } + return await self.http.post("/store/namespaces", json=_provided_vals(payload)) + + def get_sync_client( *, url: Optional[str] = None, @@ -1913,6 +2117,7 @@ def __init__(self, client: httpx.Client) -> None: self.threads = SyncThreadsClient(self.http) self.runs = SyncRunsClient(self.http) self.crons = SyncCronClient(self.http) + self.store = SyncStoreClient(self.http) class SyncHttpClient: @@ -1981,9 +2186,9 @@ def patch(self, path: str, *, json: dict) -> Any: raise e return decode_json(r) - def delete(self, path: str) -> None: + def delete(self, path: str, *, json: Optional[Any] = None) -> None: """Make a DELETE request.""" - r = self.client.delete(path) + r = self.client.request("DELETE", path, json=json) try: r.raise_for_status() except httpx.HTTPStatusError as e: @@ -3628,3 +3833,204 @@ def search( } payload = {k: v for k, v in payload.items() if v is not None} return self.http.post("/runs/crons/search", json=payload) + + +class SyncStoreClient: + def __init__(self, http: SyncHttpClient) -> None: + self.http = http + + def put_item( + self, namespace: Sequence[str], /, key: str, value: dict[str, Any] + ) -> None: + """Store or update an item. + + Args: + namespace: A list of strings representing the namespace path. + key: The unique identifier for the item within the namespace. + value: A dictionary containing the item's data. + + Returns: + None + + Example Usage: + + client.store.put_item( + ["documents", "user123"], + key="item456", + value={"title": "My Document", "content": "Hello World"} + ) + """ + for label in namespace: + if "." in label: + raise ValueError( + f"Invalid namespace label '{label}'. Namespace labels cannot contain periods ('.')." + ) + payload = { + "namespace": namespace, + "key": key, + "value": value, + } + self.http.put("/store/items", json=payload) + + def get_item(self, namespace: Sequence[str], /, key: str) -> Item: + """Retrieve a single item. + + Args: + key: The unique identifier for the item. + namespace: Optional list of strings representing the namespace path. + + Returns: + Item: The retrieved item. + + Example Usage: + + item = client.store.get_item( + ["documents", "user123"], + key="item456", + ) + print(item) + + ---------------------------------------------------------------- + + { + 'namespace': ['documents', 'user123'], + 'key': 'item456', + 'value': {'title': 'My Document', 'content': 'Hello World'}, + 'created_at': '2024-07-30T12:00:00Z', + 'updated_at': '2024-07-30T12:00:00Z' + } + """ + for label in namespace: + if "." in label: + raise ValueError( + f"Invalid namespace label '{label}'. Namespace labels cannot contain periods ('.')." + ) + + return self.http.get( + "/store/items", params={"key": key, "namespace": ".".join(namespace)} + ) + + def delete_item(self, namespace: Sequence[str], /, key: str) -> None: + """Delete an item. + + Args: + key: The unique identifier for the item. + namespace: Optional list of strings representing the namespace path. + + Returns: + None + + Example Usage: + + client.store.delete_item( + ["documents", "user123"], + key="item456", + ) + """ + self.http.delete("/store/items", json={"key": key, "namespace": namespace}) + + def search_items( + self, + namespace_prefix: Sequence[str], + /, + filter: Optional[dict[str, Any]] = None, + limit: int = 10, + offset: int = 0, + ) -> SearchItemsResponse: + """Search for items within a namespace prefix. + + Args: + namespace_prefix: List of strings representing the namespace prefix. + filter: Optional dictionary of key-value pairs to filter results. + limit: Maximum number of items to return (default is 10). + offset: Number of items to skip before returning results (default is 0). + + Returns: + List[Item]: A list of items matching the search criteria. + + Example Usage: + + items = client.store.search_items( + ["documents"], + filter={"author": "John Doe"}, + limit=5, + offset=0 + ) + print(items) + + ---------------------------------------------------------------- + + { + "items": [ + { + "namespace": ["documents", "user123"], + "key": "item789", + "value": { + "title": "Another Document", + "author": "John Doe" + }, + "created_at": "2024-07-30T12:00:00Z", + "updated_at": "2024-07-30T12:00:00Z" + }, + # ... additional items ... + ] + } + """ + payload = { + "namespace_prefix": namespace_prefix, + "filter": filter, + "limit": limit, + "offset": offset, + } + return self.http.post("/store/items/search", json=_provided_vals(payload)) + + def list_namespaces( + self, + prefix: Optional[List[str]] = None, + suffix: Optional[List[str]] = None, + max_depth: Optional[int] = None, + limit: int = 100, + offset: int = 0, + ) -> ListNamespaceResponse: + """List namespaces with optional match conditions. + + Args: + prefix: Optional list of strings representing the prefix to filter namespaces. + suffix: Optional list of strings representing the suffix to filter namespaces. + max_depth: Optional integer specifying the maximum depth of namespaces to return. + limit: Maximum number of namespaces to return (default is 100). + offset: Number of namespaces to skip before returning results (default is 0). + + Returns: + List[List[str]]: A list of namespaces matching the criteria. + + Example Usage: + + namespaces = client.store.list_namespaces( + prefix=["documents"], + max_depth=3, + limit=10, + offset=0 + ) + print(namespaces) + + ---------------------------------------------------------------- + + [ + ["documents", "user123", "reports"], + ["documents", "user456", "invoices"], + ... + ] + """ + payload = { + "prefix": prefix, + "suffix": suffix, + "max_depth": max_depth, + "limit": limit, + "offset": offset, + } + return self.http.post("/store/namespaces", json=_provided_vals(payload)) + + +def _provided_vals(d: dict): + return {k: v for k, v in d.items() if v is not None} diff --git a/libs/sdk-py/langgraph_sdk/schema.py b/libs/sdk-py/langgraph_sdk/schema.py index da1db204c8..2ea166aef7 100644 --- a/libs/sdk-py/langgraph_sdk/schema.py +++ b/libs/sdk-py/langgraph_sdk/schema.py @@ -197,6 +197,30 @@ class RunCreate(TypedDict): multitask_strategy: Optional[MultitaskStrategy] +class Item(TypedDict): + namespace: list[str] + """The namespace of the item.""" + key: str + """The unique identifier of the item within its namespace. + + In general, keys are not globally unique. + """ + value: dict[str, Any] + """The value stored in the item. This is the document itself.""" + created_at: datetime + """The timestamp when the item was created.""" + updated_at: datetime + """The timestamp when the item was last updated.""" + + +class ListNamespaceResponse(TypedDict): + namespaces: list[list[str]] + + +class SearchItemsResponse(TypedDict): + items: list[Item] + + class StreamPart(NamedTuple): event: str data: dict