diff --git a/libs/checkpoint-postgres/langgraph/store/postgres/base.py b/libs/checkpoint-postgres/langgraph/store/postgres/base.py index 94c7d3e4e..28edd8998 100644 --- a/libs/checkpoint-postgres/langgraph/store/postgres/base.py +++ b/libs/checkpoint-postgres/langgraph/store/postgres/base.py @@ -56,6 +56,7 @@ class Migration(NamedTuple): sql: str params: Optional[dict[str, Any]] = None + condition: Optional[Callable[["BasePostgresStore"], bool]] = None MIGRATIONS: Sequence[str] = [ @@ -104,11 +105,29 @@ class Migration(NamedTuple): ), }, ), - # TODO: Add an HNSW or IVFFlat index depending on config - # First must improve the search query when filtering by - # namespace + Migration( + """ +CREATE INDEX IF NOT EXISTS store_vectors_embedding_idx ON store_vectors + USING %(index_type)s (embedding %(ops)s)%(index_params)s; +""", + condition=lambda store: bool( + store.index_config and _get_index_params(store)[0] != "flat" + ), + params={ + "index_type": lambda store: _get_index_params(store)[0], + "ops": lambda store: _get_vector_type_ops(store), + "index_params": lambda store: ( + " WITH (" + + ", ".join(f"{k}={v}" for k, v in _get_index_params(store)[1].items()) + + ")" + if _get_index_params(store)[1] + else "" + ), + }, + ), ] + C = TypeVar("C", bound=Union[_pg_internal.Conn, _ainternal.Conn]) @@ -140,6 +159,8 @@ class PoolConfig(TypedDict, total=False): class ANNIndexConfig(TypedDict, total=False): """Configuration for vector index in PostgreSQL store.""" + kind: Literal["hnsw", "ivfflat", "flat"] + """Type of index to use: 'hnsw' for Hierarchical Navigable Small World, or 'ivfflat' for Inverted File Flat.""" vector_type: Literal["vector", "halfvec"] """Type of vector storage to use. Options: @@ -148,6 +169,35 @@ class ANNIndexConfig(TypedDict, total=False): """ +class HNSWConfig(ANNIndexConfig, total=False): + """Configuration for HNSW (Hierarchical Navigable Small World) index.""" + + kind: Literal["hnsw"] # type: ignore[misc] + m: int + """Maximum number of connections per layer. Default is 16.""" + ef_construction: int + """Size of dynamic candidate list for index construction. Default is 64.""" + + +class IVFFlatConfig(ANNIndexConfig, total=False): + """IVFFlat index divides vectors into lists, and then searches a subset of those lists that are closest to the query vector. It has faster build times and uses less memory than HNSW, but has lower query performance (in terms of speed-recall tradeoff). + + Three keys to achieving good recall are: + 1. Create the index after the table has some data + 2. Choose an appropriate number of lists - a good place to start is rows / 1000 for up to 1M rows and sqrt(rows) for over 1M rows + 3. When querying, specify an appropriate number of probes (higher is better for recall, lower is better for speed) - a good place to start is sqrt(lists) + """ + + kind: Literal["ivfflat"] # type: ignore[misc] + nlist: int + """Number of inverted lists (clusters) for IVF index. + + Determines the number of clusters used in the index structure. + Higher values can improve search speed but increase index size and build time. + Typically set to the square root of the number of vectors in the index. + """ + + class PostgresIndexConfig(IndexConfig, total=False): """Configuration for vector embeddings in PostgreSQL store with pgvector-specific options. @@ -774,6 +824,8 @@ def _get_version(cur: Cursor[dict[str, Any]], table: str) -> int: for v, migration in enumerate( self.VECTOR_MIGRATIONS[version + 1 :], start=version + 1 ): + if migration.condition and not migration.condition(self): + continue sql = migration.sql if migration.params: params = { @@ -832,6 +884,18 @@ def _get_vector_type_ops(store: BasePostgresStore) -> str: return f"{type_prefix}_{distance_suffix}" +def _get_index_params(store: Any) -> tuple[str, dict[str, Any]]: + """Get the index type and configuration based on config.""" + if not store.index_config: + return "hnsw", {} + + config = cast(PostgresIndexConfig, store.index_config) + index_config = config.get("ann_index_config", _DEFAULT_ANN_CONFIG).copy() + kind = index_config.pop("kind", "hnsw") + index_config.pop("vector_type", None) + return kind, index_config + + def _namespace_to_text( namespace: tuple[str, ...], handle_wildcards: bool = False ) -> str: