diff --git a/libs/checkpoint-postgres/langgraph/store/postgres/base.py b/libs/checkpoint-postgres/langgraph/store/postgres/base.py index 2a908c90e..94c7d3e4e 100644 --- a/libs/checkpoint-postgres/langgraph/store/postgres/base.py +++ b/libs/checkpoint-postgres/langgraph/store/postgres/base.py @@ -321,7 +321,7 @@ def _prepare_batch_search_queries( if op.query and self.index_config: embedding_requests.append((idx, op.query)) - score_operator = _get_distance_operator(self) + score_operator, post_operator = _get_distance_operator(self) vector_type = ( cast(PostgresIndexConfig, self.index_config) .get("ann_index_config", {}) @@ -351,18 +351,28 @@ def _prepare_batch_search_queries( if not filter_conditions else " AND " + " AND ".join(filter_conditions) ) + if op.namespace_prefix: + prefix_filter_str = f"WHERE s.prefix LIKE %s {filter_str} " + ns_args: Sequence = (f"{_namespace_to_text(op.namespace_prefix)}%",) + else: + ns_args = () + if filter_str: + prefix_filter_str = f"WHERE {filter_str} " + else: + prefix_filter_str = "" + base_query = f""" WITH scored AS ( - SELECT s.prefix, s.key, s.value, s.created_at, s.updated_at, {score_operator} AS score + SELECT s.prefix, s.key, s.value, s.created_at, s.updated_at, {score_operator} AS neg_score FROM store s JOIN store_vectors sv ON s.prefix = sv.prefix AND s.key = sv.key - WHERE s.prefix LIKE %s {filter_str} - ORDER BY {score_operator} DESC + {prefix_filter_str} + ORDER BY {score_operator} ASC LIMIT %s ) SELECT * FROM ( SELECT DISTINCT ON (prefix, key) - prefix, key, value, created_at, updated_at, score + prefix, key, value, created_at, updated_at, {post_operator} as score FROM scored ORDER BY prefix, key, score DESC ) AS unique_docs @@ -372,7 +382,7 @@ def _prepare_batch_search_queries( """ params = [ _PLACEHOLDER, # Vector placeholder - f"{_namespace_to_text(op.namespace_prefix)}%", + *ns_args, *filter_params, _PLACEHOLDER, expanded_limit, @@ -702,7 +712,6 @@ def _batch_search_ops( _paramslist[i] = embedding for (idx, _), (query, params) in zip(search_ops, queries): - # Execute the actual query cur.execute(query, params) rows = cast(list[Row], cur.fetchall()) results[idx] = [ @@ -915,7 +924,7 @@ def _decode_ns_bytes(namespace: Union[str, bytes, list]) -> tuple[str, ...]: return tuple(namespace.split(".")) -def _get_distance_operator(store: Any) -> str: +def _get_distance_operator(store: Any) -> tuple[str, str]: """Get the distance operator and score expression based on config.""" # Note: Today, we are not using ANN indices due to restrictions # on PGVector's support for mixing vector and non-vector filters @@ -936,12 +945,22 @@ def _get_distance_operator(store: Any) -> str: config = cast(PostgresIndexConfig, store.index_config) distance_type = config.get("distance_type", "cosine") + # Return the operator and the score expression + # The operator is used in the CTE and will be compatible with an ASCENDING ORDER + # sort clause. + # The score expression is used in the final query and will be compatible with + # a DESCENDING ORDER sort clause and the user's expectations of what the similarity score + # should be. if distance_type == "l2": - return "1 - (sv.embedding <-> %s::%s)" + # Final: "-(sv.embedding <-> %s::%s)" + # We return the "l2 similarity" so that the sorting order is the same + return "sv.embedding <-> %s::%s", "-scored.neg_score" elif distance_type == "inner_product": - return "-(sv.embedding <#> %s::%s)" - else: # cosine - return "1 - (sv.embedding <=> %s::%s)" + # Final: "-(sv.embedding <#> %s::%s)" + return "sv.embedding <#> %s::%s", "-(scored.neg_score)" + else: # cosine similarity + # Final: "1 - (sv.embedding <=> %s::%s)" + return "sv.embedding <=> %s::%s", "1 - scored.neg_score" def _ensure_index_config( diff --git a/libs/checkpoint-postgres/tests/test_store.py b/libs/checkpoint-postgres/tests/test_store.py index c9d220fe0..35dfa2150 100644 --- a/libs/checkpoint-postgres/tests/test_store.py +++ b/libs/checkpoint-postgres/tests/test_store.py @@ -634,6 +634,7 @@ def test_embed_with_path_operation_config( distance_type: str, ) -> None: """Test operation-level field configuration for vector search.""" + with _create_vector_store( vector_type, distance_type, @@ -695,3 +696,89 @@ def test_embed_with_path_operation_config( # assert len(results) == 3 # doc5_result = next(r for r in results if r.key == "doc5") # assert doc5_result.score is None + + +def _cosine_similarity(X: list[float], Y: list[list[float]]) -> list[float]: + """ + Compute cosine similarity between a vector X and a matrix Y. + Lazy import numpy for efficiency. + """ + + similarities = [] + for y in Y: + dot_product = sum(a * b for a, b in zip(X, y)) + norm1 = sum(a * a for a in X) ** 0.5 + norm2 = sum(a * a for a in y) ** 0.5 + similarity = dot_product / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0.0 + similarities.append(similarity) + + return similarities + + +def _inner_product(X: list[float], Y: list[list[float]]) -> list[float]: + """ + Compute inner product between a vector X and a matrix Y. + Lazy import numpy for efficiency. + """ + + similarities = [] + for y in Y: + similarity = sum(a * b for a, b in zip(X, y)) + similarities.append(similarity) + + return similarities + + +def _neg_l2_distance(X: list[float], Y: list[list[float]]) -> list[float]: + """ + Compute l2 distance between a vector X and a matrix Y. + Lazy import numpy for efficiency. + """ + + similarities = [] + for y in Y: + similarity = sum((a - b) ** 2 for a, b in zip(X, y)) ** 0.5 + similarities.append(-similarity) + + return similarities + + +@pytest.mark.parametrize( + "vector_type,distance_type", + [ + ("vector", "cosine"), + ("vector", "inner_product"), + ("halfvec", "l2"), + ], +) +@pytest.mark.parametrize("query", ["aaa", "bbb", "ccc", "abcd", "poisson"]) +def test_scores( + fake_embeddings: CharacterEmbeddings, + vector_type: str, + distance_type: str, + query: str, +) -> None: + """Test operation-level field configuration for vector search.""" + with _create_vector_store( + vector_type, + distance_type, + fake_embeddings, + text_fields=["key0"], + ) as store: + doc = { + "key0": "aaa", + } + store.put(("test",), "doc", doc, index=["key0", "key1"]) + + results = store.search((), query=query) + vec0 = fake_embeddings.embed_query(doc["key0"]) + vec1 = fake_embeddings.embed_query(query) + if distance_type == "cosine": + similarities = _cosine_similarity(vec1, [vec0]) + elif distance_type == "inner_product": + similarities = _inner_product(vec1, [vec0]) + elif distance_type == "l2": + similarities = _neg_l2_distance(vec1, [vec0]) + + assert len(results) == 1 + assert results[0].score == pytest.approx(similarities[0], abs=1e-3)