From 20f091a27710f9c0b0a799786f73bc73b13daaa9 Mon Sep 17 00:00:00 2001 From: William FH <13333726+hinthornw@users.noreply.github.com> Date: Mon, 2 Dec 2024 17:08:24 -0800 Subject: [PATCH] [postgres] Sort Ascending (#2594) Adds a few of preliminaries: 1. Makes the returned "score" actually the result of the requested operation (cosine, inner_product, l2) 2. Sorts asc, etc. so that if you were to add an HNSW index (and not have any WHERE filters), it would be used 3. Drop the inner WHERE statement if no namespace or other filters are provided. See (2) for why. I don't yet add an index to the migrations since I think we need to agree on the right balance to ensure it's actually used in common query patterns. --- .../langgraph/store/postgres/base.py | 43 ++++++--- libs/checkpoint-postgres/tests/test_store.py | 87 +++++++++++++++++++ 2 files changed, 118 insertions(+), 12 deletions(-) diff --git a/libs/checkpoint-postgres/langgraph/store/postgres/base.py b/libs/checkpoint-postgres/langgraph/store/postgres/base.py index 2a908c90e..94c7d3e4e 100644 --- a/libs/checkpoint-postgres/langgraph/store/postgres/base.py +++ b/libs/checkpoint-postgres/langgraph/store/postgres/base.py @@ -321,7 +321,7 @@ def _prepare_batch_search_queries( if op.query and self.index_config: embedding_requests.append((idx, op.query)) - score_operator = _get_distance_operator(self) + score_operator, post_operator = _get_distance_operator(self) vector_type = ( cast(PostgresIndexConfig, self.index_config) .get("ann_index_config", {}) @@ -351,18 +351,28 @@ def _prepare_batch_search_queries( if not filter_conditions else " AND " + " AND ".join(filter_conditions) ) + if op.namespace_prefix: + prefix_filter_str = f"WHERE s.prefix LIKE %s {filter_str} " + ns_args: Sequence = (f"{_namespace_to_text(op.namespace_prefix)}%",) + else: + ns_args = () + if filter_str: + prefix_filter_str = f"WHERE {filter_str} " + else: + prefix_filter_str = "" + base_query = f""" WITH scored AS ( - SELECT s.prefix, s.key, s.value, s.created_at, s.updated_at, {score_operator} AS score + SELECT s.prefix, s.key, s.value, s.created_at, s.updated_at, {score_operator} AS neg_score FROM store s JOIN store_vectors sv ON s.prefix = sv.prefix AND s.key = sv.key - WHERE s.prefix LIKE %s {filter_str} - ORDER BY {score_operator} DESC + {prefix_filter_str} + ORDER BY {score_operator} ASC LIMIT %s ) SELECT * FROM ( SELECT DISTINCT ON (prefix, key) - prefix, key, value, created_at, updated_at, score + prefix, key, value, created_at, updated_at, {post_operator} as score FROM scored ORDER BY prefix, key, score DESC ) AS unique_docs @@ -372,7 +382,7 @@ def _prepare_batch_search_queries( """ params = [ _PLACEHOLDER, # Vector placeholder - f"{_namespace_to_text(op.namespace_prefix)}%", + *ns_args, *filter_params, _PLACEHOLDER, expanded_limit, @@ -702,7 +712,6 @@ def _batch_search_ops( _paramslist[i] = embedding for (idx, _), (query, params) in zip(search_ops, queries): - # Execute the actual query cur.execute(query, params) rows = cast(list[Row], cur.fetchall()) results[idx] = [ @@ -915,7 +924,7 @@ def _decode_ns_bytes(namespace: Union[str, bytes, list]) -> tuple[str, ...]: return tuple(namespace.split(".")) -def _get_distance_operator(store: Any) -> str: +def _get_distance_operator(store: Any) -> tuple[str, str]: """Get the distance operator and score expression based on config.""" # Note: Today, we are not using ANN indices due to restrictions # on PGVector's support for mixing vector and non-vector filters @@ -936,12 +945,22 @@ def _get_distance_operator(store: Any) -> str: config = cast(PostgresIndexConfig, store.index_config) distance_type = config.get("distance_type", "cosine") + # Return the operator and the score expression + # The operator is used in the CTE and will be compatible with an ASCENDING ORDER + # sort clause. + # The score expression is used in the final query and will be compatible with + # a DESCENDING ORDER sort clause and the user's expectations of what the similarity score + # should be. if distance_type == "l2": - return "1 - (sv.embedding <-> %s::%s)" + # Final: "-(sv.embedding <-> %s::%s)" + # We return the "l2 similarity" so that the sorting order is the same + return "sv.embedding <-> %s::%s", "-scored.neg_score" elif distance_type == "inner_product": - return "-(sv.embedding <#> %s::%s)" - else: # cosine - return "1 - (sv.embedding <=> %s::%s)" + # Final: "-(sv.embedding <#> %s::%s)" + return "sv.embedding <#> %s::%s", "-(scored.neg_score)" + else: # cosine similarity + # Final: "1 - (sv.embedding <=> %s::%s)" + return "sv.embedding <=> %s::%s", "1 - scored.neg_score" def _ensure_index_config( diff --git a/libs/checkpoint-postgres/tests/test_store.py b/libs/checkpoint-postgres/tests/test_store.py index c9d220fe0..35dfa2150 100644 --- a/libs/checkpoint-postgres/tests/test_store.py +++ b/libs/checkpoint-postgres/tests/test_store.py @@ -634,6 +634,7 @@ def test_embed_with_path_operation_config( distance_type: str, ) -> None: """Test operation-level field configuration for vector search.""" + with _create_vector_store( vector_type, distance_type, @@ -695,3 +696,89 @@ def test_embed_with_path_operation_config( # assert len(results) == 3 # doc5_result = next(r for r in results if r.key == "doc5") # assert doc5_result.score is None + + +def _cosine_similarity(X: list[float], Y: list[list[float]]) -> list[float]: + """ + Compute cosine similarity between a vector X and a matrix Y. + Lazy import numpy for efficiency. + """ + + similarities = [] + for y in Y: + dot_product = sum(a * b for a, b in zip(X, y)) + norm1 = sum(a * a for a in X) ** 0.5 + norm2 = sum(a * a for a in y) ** 0.5 + similarity = dot_product / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0.0 + similarities.append(similarity) + + return similarities + + +def _inner_product(X: list[float], Y: list[list[float]]) -> list[float]: + """ + Compute inner product between a vector X and a matrix Y. + Lazy import numpy for efficiency. + """ + + similarities = [] + for y in Y: + similarity = sum(a * b for a, b in zip(X, y)) + similarities.append(similarity) + + return similarities + + +def _neg_l2_distance(X: list[float], Y: list[list[float]]) -> list[float]: + """ + Compute l2 distance between a vector X and a matrix Y. + Lazy import numpy for efficiency. + """ + + similarities = [] + for y in Y: + similarity = sum((a - b) ** 2 for a, b in zip(X, y)) ** 0.5 + similarities.append(-similarity) + + return similarities + + +@pytest.mark.parametrize( + "vector_type,distance_type", + [ + ("vector", "cosine"), + ("vector", "inner_product"), + ("halfvec", "l2"), + ], +) +@pytest.mark.parametrize("query", ["aaa", "bbb", "ccc", "abcd", "poisson"]) +def test_scores( + fake_embeddings: CharacterEmbeddings, + vector_type: str, + distance_type: str, + query: str, +) -> None: + """Test operation-level field configuration for vector search.""" + with _create_vector_store( + vector_type, + distance_type, + fake_embeddings, + text_fields=["key0"], + ) as store: + doc = { + "key0": "aaa", + } + store.put(("test",), "doc", doc, index=["key0", "key1"]) + + results = store.search((), query=query) + vec0 = fake_embeddings.embed_query(doc["key0"]) + vec1 = fake_embeddings.embed_query(query) + if distance_type == "cosine": + similarities = _cosine_similarity(vec1, [vec0]) + elif distance_type == "inner_product": + similarities = _inner_product(vec1, [vec0]) + elif distance_type == "l2": + similarities = _neg_l2_distance(vec1, [vec0]) + + assert len(results) == 1 + assert results[0].score == pytest.approx(similarities[0], abs=1e-3)