Skip to content

Commit

Permalink
Add IVFFlat and HNSW support (#2598)
Browse files Browse the repository at this point in the history
It seems that actually once i moved the operators & other things out,
the query planner does do reasonable things and do sequential scanning
if filtered N < some size but the index otherwise, even with namespace
filtering.
  • Loading branch information
hinthornw authored Dec 3, 2024
1 parent fe538d4 commit 15f0765
Showing 1 changed file with 67 additions and 3 deletions.
70 changes: 67 additions & 3 deletions libs/checkpoint-postgres/langgraph/store/postgres/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Migration(NamedTuple):

sql: str
params: Optional[dict[str, Any]] = None
condition: Optional[Callable[["BasePostgresStore"], bool]] = None


MIGRATIONS: Sequence[str] = [
Expand Down Expand Up @@ -104,11 +105,29 @@ class Migration(NamedTuple):
),
},
),
# TODO: Add an HNSW or IVFFlat index depending on config
# First must improve the search query when filtering by
# namespace
Migration(
"""
CREATE INDEX IF NOT EXISTS store_vectors_embedding_idx ON store_vectors
USING %(index_type)s (embedding %(ops)s)%(index_params)s;
""",
condition=lambda store: bool(
store.index_config and _get_index_params(store)[0] != "flat"
),
params={
"index_type": lambda store: _get_index_params(store)[0],
"ops": lambda store: _get_vector_type_ops(store),
"index_params": lambda store: (
" WITH ("
+ ", ".join(f"{k}={v}" for k, v in _get_index_params(store)[1].items())
+ ")"
if _get_index_params(store)[1]
else ""
),
},
),
]


C = TypeVar("C", bound=Union[_pg_internal.Conn, _ainternal.Conn])


Expand Down Expand Up @@ -140,6 +159,8 @@ class PoolConfig(TypedDict, total=False):
class ANNIndexConfig(TypedDict, total=False):
"""Configuration for vector index in PostgreSQL store."""

kind: Literal["hnsw", "ivfflat", "flat"]
"""Type of index to use: 'hnsw' for Hierarchical Navigable Small World, or 'ivfflat' for Inverted File Flat."""
vector_type: Literal["vector", "halfvec"]
"""Type of vector storage to use.
Options:
Expand All @@ -148,6 +169,35 @@ class ANNIndexConfig(TypedDict, total=False):
"""


class HNSWConfig(ANNIndexConfig, total=False):
"""Configuration for HNSW (Hierarchical Navigable Small World) index."""

kind: Literal["hnsw"] # type: ignore[misc]
m: int
"""Maximum number of connections per layer. Default is 16."""
ef_construction: int
"""Size of dynamic candidate list for index construction. Default is 64."""


class IVFFlatConfig(ANNIndexConfig, total=False):
"""IVFFlat index divides vectors into lists, and then searches a subset of those lists that are closest to the query vector. It has faster build times and uses less memory than HNSW, but has lower query performance (in terms of speed-recall tradeoff).
Three keys to achieving good recall are:
1. Create the index after the table has some data
2. Choose an appropriate number of lists - a good place to start is rows / 1000 for up to 1M rows and sqrt(rows) for over 1M rows
3. When querying, specify an appropriate number of probes (higher is better for recall, lower is better for speed) - a good place to start is sqrt(lists)
"""

kind: Literal["ivfflat"] # type: ignore[misc]
nlist: int
"""Number of inverted lists (clusters) for IVF index.
Determines the number of clusters used in the index structure.
Higher values can improve search speed but increase index size and build time.
Typically set to the square root of the number of vectors in the index.
"""


class PostgresIndexConfig(IndexConfig, total=False):
"""Configuration for vector embeddings in PostgreSQL store with pgvector-specific options.
Expand Down Expand Up @@ -774,6 +824,8 @@ def _get_version(cur: Cursor[dict[str, Any]], table: str) -> int:
for v, migration in enumerate(
self.VECTOR_MIGRATIONS[version + 1 :], start=version + 1
):
if migration.condition and not migration.condition(self):
continue
sql = migration.sql
if migration.params:
params = {
Expand Down Expand Up @@ -832,6 +884,18 @@ def _get_vector_type_ops(store: BasePostgresStore) -> str:
return f"{type_prefix}_{distance_suffix}"


def _get_index_params(store: Any) -> tuple[str, dict[str, Any]]:
"""Get the index type and configuration based on config."""
if not store.index_config:
return "hnsw", {}

config = cast(PostgresIndexConfig, store.index_config)
index_config = config.get("ann_index_config", _DEFAULT_ANN_CONFIG).copy()
kind = index_config.pop("kind", "hnsw")
index_config.pop("vector_type", None)
return kind, index_config


def _namespace_to_text(
namespace: tuple[str, ...], handle_wildcards: bool = False
) -> str:
Expand Down

0 comments on commit 15f0765

Please sign in to comment.