Skip to content

Commit

Permalink
[postgres] Sort Ascending (#2594)
Browse files Browse the repository at this point in the history
Adds a few of preliminaries:
1. Makes the returned "score" actually the result of the requested
operation (cosine, inner_product, l2)
2. Sorts asc, etc. so that if you were to add an HNSW index (and not
have any WHERE filters), it would be used
3. Drop the inner WHERE statement if no namespace or other filters are
provided. See (2) for why.
I don't yet add an index to the migrations since I think we need to
agree on the right balance to ensure it's actually used in common query
patterns.
  • Loading branch information
hinthornw authored Dec 3, 2024
1 parent c6fe265 commit 20f091a
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 12 deletions.
43 changes: 31 additions & 12 deletions libs/checkpoint-postgres/langgraph/store/postgres/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def _prepare_batch_search_queries(
if op.query and self.index_config:
embedding_requests.append((idx, op.query))

score_operator = _get_distance_operator(self)
score_operator, post_operator = _get_distance_operator(self)
vector_type = (
cast(PostgresIndexConfig, self.index_config)
.get("ann_index_config", {})
Expand Down Expand Up @@ -351,18 +351,28 @@ def _prepare_batch_search_queries(
if not filter_conditions
else " AND " + " AND ".join(filter_conditions)
)
if op.namespace_prefix:
prefix_filter_str = f"WHERE s.prefix LIKE %s {filter_str} "
ns_args: Sequence = (f"{_namespace_to_text(op.namespace_prefix)}%",)
else:
ns_args = ()
if filter_str:
prefix_filter_str = f"WHERE {filter_str} "
else:
prefix_filter_str = ""

base_query = f"""
WITH scored AS (
SELECT s.prefix, s.key, s.value, s.created_at, s.updated_at, {score_operator} AS score
SELECT s.prefix, s.key, s.value, s.created_at, s.updated_at, {score_operator} AS neg_score
FROM store s
JOIN store_vectors sv ON s.prefix = sv.prefix AND s.key = sv.key
WHERE s.prefix LIKE %s {filter_str}
ORDER BY {score_operator} DESC
{prefix_filter_str}
ORDER BY {score_operator} ASC
LIMIT %s
)
SELECT * FROM (
SELECT DISTINCT ON (prefix, key)
prefix, key, value, created_at, updated_at, score
prefix, key, value, created_at, updated_at, {post_operator} as score
FROM scored
ORDER BY prefix, key, score DESC
) AS unique_docs
Expand All @@ -372,7 +382,7 @@ def _prepare_batch_search_queries(
"""
params = [
_PLACEHOLDER, # Vector placeholder
f"{_namespace_to_text(op.namespace_prefix)}%",
*ns_args,
*filter_params,
_PLACEHOLDER,
expanded_limit,
Expand Down Expand Up @@ -702,7 +712,6 @@ def _batch_search_ops(
_paramslist[i] = embedding

for (idx, _), (query, params) in zip(search_ops, queries):
# Execute the actual query
cur.execute(query, params)
rows = cast(list[Row], cur.fetchall())
results[idx] = [
Expand Down Expand Up @@ -915,7 +924,7 @@ def _decode_ns_bytes(namespace: Union[str, bytes, list]) -> tuple[str, ...]:
return tuple(namespace.split("."))


def _get_distance_operator(store: Any) -> str:
def _get_distance_operator(store: Any) -> tuple[str, str]:
"""Get the distance operator and score expression based on config."""
# Note: Today, we are not using ANN indices due to restrictions
# on PGVector's support for mixing vector and non-vector filters
Expand All @@ -936,12 +945,22 @@ def _get_distance_operator(store: Any) -> str:
config = cast(PostgresIndexConfig, store.index_config)
distance_type = config.get("distance_type", "cosine")

# Return the operator and the score expression
# The operator is used in the CTE and will be compatible with an ASCENDING ORDER
# sort clause.
# The score expression is used in the final query and will be compatible with
# a DESCENDING ORDER sort clause and the user's expectations of what the similarity score
# should be.
if distance_type == "l2":
return "1 - (sv.embedding <-> %s::%s)"
# Final: "-(sv.embedding <-> %s::%s)"
# We return the "l2 similarity" so that the sorting order is the same
return "sv.embedding <-> %s::%s", "-scored.neg_score"
elif distance_type == "inner_product":
return "-(sv.embedding <#> %s::%s)"
else: # cosine
return "1 - (sv.embedding <=> %s::%s)"
# Final: "-(sv.embedding <#> %s::%s)"
return "sv.embedding <#> %s::%s", "-(scored.neg_score)"
else: # cosine similarity
# Final: "1 - (sv.embedding <=> %s::%s)"
return "sv.embedding <=> %s::%s", "1 - scored.neg_score"


def _ensure_index_config(
Expand Down
87 changes: 87 additions & 0 deletions libs/checkpoint-postgres/tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ def test_embed_with_path_operation_config(
distance_type: str,
) -> None:
"""Test operation-level field configuration for vector search."""

with _create_vector_store(
vector_type,
distance_type,
Expand Down Expand Up @@ -695,3 +696,89 @@ def test_embed_with_path_operation_config(
# assert len(results) == 3
# doc5_result = next(r for r in results if r.key == "doc5")
# assert doc5_result.score is None


def _cosine_similarity(X: list[float], Y: list[list[float]]) -> list[float]:
"""
Compute cosine similarity between a vector X and a matrix Y.
Lazy import numpy for efficiency.
"""

similarities = []
for y in Y:
dot_product = sum(a * b for a, b in zip(X, y))
norm1 = sum(a * a for a in X) ** 0.5
norm2 = sum(a * a for a in y) ** 0.5
similarity = dot_product / (norm1 * norm2) if norm1 > 0 and norm2 > 0 else 0.0
similarities.append(similarity)

return similarities


def _inner_product(X: list[float], Y: list[list[float]]) -> list[float]:
"""
Compute inner product between a vector X and a matrix Y.
Lazy import numpy for efficiency.
"""

similarities = []
for y in Y:
similarity = sum(a * b for a, b in zip(X, y))
similarities.append(similarity)

return similarities


def _neg_l2_distance(X: list[float], Y: list[list[float]]) -> list[float]:
"""
Compute l2 distance between a vector X and a matrix Y.
Lazy import numpy for efficiency.
"""

similarities = []
for y in Y:
similarity = sum((a - b) ** 2 for a, b in zip(X, y)) ** 0.5
similarities.append(-similarity)

return similarities


@pytest.mark.parametrize(
"vector_type,distance_type",
[
("vector", "cosine"),
("vector", "inner_product"),
("halfvec", "l2"),
],
)
@pytest.mark.parametrize("query", ["aaa", "bbb", "ccc", "abcd", "poisson"])
def test_scores(
fake_embeddings: CharacterEmbeddings,
vector_type: str,
distance_type: str,
query: str,
) -> None:
"""Test operation-level field configuration for vector search."""
with _create_vector_store(
vector_type,
distance_type,
fake_embeddings,
text_fields=["key0"],
) as store:
doc = {
"key0": "aaa",
}
store.put(("test",), "doc", doc, index=["key0", "key1"])

results = store.search((), query=query)
vec0 = fake_embeddings.embed_query(doc["key0"])
vec1 = fake_embeddings.embed_query(query)
if distance_type == "cosine":
similarities = _cosine_similarity(vec1, [vec0])
elif distance_type == "inner_product":
similarities = _inner_product(vec1, [vec0])
elif distance_type == "l2":
similarities = _neg_l2_distance(vec1, [vec0])

assert len(results) == 1
assert results[0].score == pytest.approx(similarities[0], abs=1e-3)

0 comments on commit 20f091a

Please sign in to comment.