Skip to content

Commit

Permalink
feat!: Pinecone - support for the new API (#793)
Browse files Browse the repository at this point in the history
* upgrate to new API

* increase sleep time

* update example

* address feedback
  • Loading branch information
anakin87 authored Jun 10, 2024
1 parent 1f71c72 commit 4dda1b9
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 54 deletions.
2 changes: 1 addition & 1 deletion integrations/pinecone/examples/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@

document_store = PineconeDocumentStore(
api_key=Secret.from_token("YOUR-PINECONE-API-KEY"),
environment="gcp-starter",
index="default",
namespace="default",
dimension=768,
spec={"serverless": {"region": "us-east-1", "cloud": "aws"}},
)

indexing = Pipeline()
Expand Down
3 changes: 1 addition & 2 deletions integrations/pinecone/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ classifiers = [
]
dependencies = [
"haystack-ai",
"pinecone-client<3", # our implementation is not compatible with pinecone-client>=3
# see https://github.com/deepset-ai/haystack-core-integrations/issues/223
"pinecone-client>=3" # our implementation is not compatible with pinecone-client <3
]

[project.urls]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io
import logging
from copy import copy
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Literal, Optional

import pandas as pd
from haystack import default_from_dict, default_to_dict
Expand All @@ -13,7 +13,7 @@
from haystack.utils import Secret, deserialize_secrets_inplace
from haystack.utils.filters import convert

import pinecone
from pinecone import Pinecone, PodSpec, ServerlessSpec

from .filters import _normalize_filters

Expand All @@ -25,6 +25,9 @@
TOP_K_LIMIT = 1_000


DEFAULT_STARTER_PLAN_SPEC = {"serverless": {"region": "us-east-1", "cloud": "aws"}}


class PineconeDocumentStore:
"""
A Document Store using [Pinecone vector database](https://www.pinecone.io/).
Expand All @@ -34,41 +37,48 @@ def __init__(
self,
*,
api_key: Secret = Secret.from_env_var("PINECONE_API_KEY"), # noqa: B008
environment: str = "us-west1-gcp",
index: str = "default",
namespace: str = "default",
batch_size: int = 100,
dimension: int = 768,
**index_creation_kwargs,
spec: Optional[Dict[str, Any]] = None,
metric: Literal["cosine", "euclidean", "dotproduct"] = "cosine",
):
"""
Creates a new PineconeDocumentStore instance.
It is meant to be connected to a Pinecone index and namespace.
:param api_key: The Pinecone API key.
:param environment: The Pinecone environment to connect to.
:param index: The Pinecone index to connect to. If the index does not exist, it will be created.
:param namespace: The Pinecone namespace to connect to. If the namespace does not exist, it will be created
at the first write.
:param batch_size: The number of documents to write in a single batch. When setting this parameter,
consider [documented Pinecone limits](https://docs.pinecone.io/docs/limits).
consider [documented Pinecone limits](https://docs.pinecone.io/reference/quotas-and-limits).
:param dimension: The dimension of the embeddings. This parameter is only used when creating a new index.
:param index_creation_kwargs: Additional keyword arguments to pass to the index creation method.
You can find the full list of supported arguments in the
[API reference](https://docs.pinecone.io/reference/create_index).
:param spec: The Pinecone spec to use when creating a new index. Allows choosing between serverless and pod
deployment options and setting additional parameters. Refer to the
[Pinecone documentation](https://docs.pinecone.io/reference/api/control-plane/create_index) for more
details.
If not provided, a default spec with serverless deployment in the `us-east-1` region will be used
(compatible with the free tier).
:param metric: The metric to use for similarity search. This parameter is only used when creating a new index.
"""
self.api_key = api_key
spec = spec or DEFAULT_STARTER_PLAN_SPEC

pinecone.init(api_key=api_key.resolve_value(), environment=environment)
client = Pinecone(api_key=api_key.resolve_value(), source_tag="haystack")

if index not in pinecone.list_indexes():
if index not in client.list_indexes().names():
logger.info(f"Index {index} does not exist. Creating a new index.")
pinecone.create_index(name=index, dimension=dimension, **index_creation_kwargs)
pinecone_spec = self._convert_dict_spec_to_pinecone_object(spec)
client.create_index(name=index, dimension=dimension, spec=pinecone_spec, metric=metric)
else:
logger.info(f"Index {index} already exists. Connecting to it.")
logger.info(
f"Index {index} already exists. Connecting to it. `dimension`, `spec`, and `metric` will be ignored."
)

self._index = pinecone.Index(index_name=index)
self._index = client.Index(name=index)

actual_dimension = self._index.describe_index_stats().get("dimension")
if actual_dimension and actual_dimension != dimension:
Expand All @@ -80,11 +90,28 @@ def __init__(
self.dimension = actual_dimension or dimension

self._dummy_vector = [-10.0] * self.dimension
self.environment = environment
self.index = index
self.namespace = namespace
self.batch_size = batch_size
self.index_creation_kwargs = index_creation_kwargs
self.metric = metric
self.spec = spec

@staticmethod
def _convert_dict_spec_to_pinecone_object(spec: Dict[str, Any]):
"""Convert the spec dictionary to a Pinecone spec object"""

if "serverless" in spec:
serverless_spec = spec["serverless"]
return ServerlessSpec(**serverless_spec)
if "pod" in spec:
pod_spec = spec["pod"]
return PodSpec(**pod_spec)

msg = (
"Invalid spec. Must contain either `serverless` or `pod` key. "
"Refer to https://docs.pinecone.io/reference/api/control-plane/create_index for more details."
)
raise ValueError(msg)

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "PineconeDocumentStore":
Expand All @@ -107,12 +134,12 @@ def to_dict(self) -> Dict[str, Any]:
return default_to_dict(
self,
api_key=self.api_key.to_dict(),
environment=self.environment,
spec=self.spec,
index=self.index,
dimension=self.dimension,
namespace=self.namespace,
batch_size=self.batch_size,
**self.index_creation_kwargs,
metric=self.metric,
)

def count_documents(self) -> int:
Expand Down
4 changes: 1 addition & 3 deletions integrations/pinecone/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from haystack_integrations.document_stores.pinecone import PineconeDocumentStore

# This is the approximate time it takes for the documents to be available
SLEEP_TIME = 30
SLEEP_TIME = 10


@pytest.fixture()
Expand All @@ -21,14 +21,12 @@ def document_store(request):
This is the most basic requirement for the child class: provide
an instance of this document store so the base class can use it.
"""
environment = "gcp-starter"
index = "default"
# Use a different namespace for each test so we can run them in parallel
namespace = f"{request.node.name}-{int(time.time())}"
dimension = 768

store = PineconeDocumentStore(
environment=environment,
index=index,
namespace=namespace,
dimension=dimension,
Expand Down
98 changes: 80 additions & 18 deletions integrations/pinecone/tests/test_document_store.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,59 @@
import os
import time
from unittest.mock import patch

import numpy as np
import pytest
from haystack import Document
from haystack.testing.document_store import CountDocumentsTest, DeleteDocumentsTest, WriteDocumentsTest
from haystack.utils import Secret
from pinecone import Pinecone, PodSpec, ServerlessSpec

from haystack_integrations.document_stores.pinecone import PineconeDocumentStore


@patch("haystack_integrations.document_stores.pinecone.document_store.pinecone")
@patch("haystack_integrations.document_stores.pinecone.document_store.Pinecone")
def test_init(mock_pinecone):
mock_pinecone.Index.return_value.describe_index_stats.return_value = {"dimension": 30}
mock_pinecone.return_value.Index.return_value.describe_index_stats.return_value = {"dimension": 60}

document_store = PineconeDocumentStore(
api_key=Secret.from_token("fake-api-key"),
environment="gcp-starter",
index="my_index",
namespace="test",
batch_size=50,
dimension=30,
metric="euclidean",
)

mock_pinecone.init.assert_called_with(api_key="fake-api-key", environment="gcp-starter")
mock_pinecone.assert_called_with(api_key="fake-api-key", source_tag="haystack")

assert document_store.environment == "gcp-starter"
assert document_store.index == "my_index"
assert document_store.namespace == "test"
assert document_store.batch_size == 50
assert document_store.dimension == 30
assert document_store.index_creation_kwargs == {"metric": "euclidean"}
assert document_store.dimension == 60
assert document_store.metric == "euclidean"


@patch("haystack_integrations.document_stores.pinecone.document_store.pinecone")
@patch("haystack_integrations.document_stores.pinecone.document_store.Pinecone")
def test_init_api_key_in_environment_variable(mock_pinecone, monkeypatch):
monkeypatch.setenv("PINECONE_API_KEY", "env-api-key")

PineconeDocumentStore(
environment="gcp-starter",
index="my_index",
namespace="test",
batch_size=50,
dimension=30,
metric="euclidean",
)

mock_pinecone.init.assert_called_with(api_key="env-api-key", environment="gcp-starter")
mock_pinecone.assert_called_with(api_key="env-api-key", source_tag="haystack")


@patch("haystack_integrations.document_stores.pinecone.document_store.pinecone")
@patch("haystack_integrations.document_stores.pinecone.document_store.Pinecone")
def test_to_from_dict(mock_pinecone, monkeypatch):
mock_pinecone.Index.return_value.describe_index_stats.return_value = {"dimension": 30}
mock_pinecone.return_value.Index.return_value.describe_index_stats.return_value = {"dimension": 60}
monkeypatch.setenv("PINECONE_API_KEY", "env-api-key")
document_store = PineconeDocumentStore(
environment="gcp-starter",
index="my_index",
namespace="test",
batch_size=50,
Expand All @@ -73,34 +71,98 @@ def test_to_from_dict(mock_pinecone, monkeypatch):
"strict": True,
"type": "env_var",
},
"environment": "gcp-starter",
"index": "my_index",
"dimension": 30,
"dimension": 60,
"namespace": "test",
"batch_size": 50,
"metric": "euclidean",
"spec": {"serverless": {"region": "us-east-1", "cloud": "aws"}},
},
}
assert document_store.to_dict() == dict_output

document_store = PineconeDocumentStore.from_dict(dict_output)
assert document_store.environment == "gcp-starter"
assert document_store.api_key == Secret.from_env_var("PINECONE_API_KEY", strict=True)
assert document_store.index == "my_index"
assert document_store.namespace == "test"
assert document_store.batch_size == 50
assert document_store.dimension == 30
assert document_store.dimension == 60
assert document_store.metric == "euclidean"
assert document_store.spec == {"serverless": {"region": "us-east-1", "cloud": "aws"}}


def test_init_fails_wo_api_key(monkeypatch):
monkeypatch.delenv("PINECONE_API_KEY", raising=False)
with pytest.raises(ValueError):
PineconeDocumentStore(
environment="gcp-starter",
index="my_index",
)


def test_convert_dict_spec_to_pinecone_object_serverless():
dict_spec = {"serverless": {"region": "us-east-1", "cloud": "aws"}}
pinecone_object = PineconeDocumentStore._convert_dict_spec_to_pinecone_object(dict_spec)
assert isinstance(pinecone_object, ServerlessSpec)
assert pinecone_object.region == "us-east-1"
assert pinecone_object.cloud == "aws"


def test_convert_dict_spec_to_pinecone_object_pod():

dict_spec = {"pod": {"replicas": 1, "shards": 1, "pods": 1, "pod_type": "p1.x1", "environment": "us-west1-gcp"}}
pinecone_object = PineconeDocumentStore._convert_dict_spec_to_pinecone_object(dict_spec)

assert isinstance(pinecone_object, PodSpec)
assert pinecone_object.replicas == 1
assert pinecone_object.shards == 1
assert pinecone_object.pods == 1
assert pinecone_object.pod_type == "p1.x1"
assert pinecone_object.environment == "us-west1-gcp"


def test_convert_dict_spec_to_pinecone_object_fail():
dict_spec = {
"strange_key": {"replicas": 1, "shards": 1, "pods": 1, "pod_type": "p1.x1", "environment": "us-west1-gcp"}
}
with pytest.raises(ValueError):
PineconeDocumentStore._convert_dict_spec_to_pinecone_object(dict_spec)


@pytest.mark.integration
@pytest.mark.skipif("PINECONE_API_KEY" not in os.environ, reason="PINECONE_API_KEY not set")
def test_serverless_index_creation_from_scratch(sleep_time):
index_name = "my-serverless-index"

client = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
try:
client.delete_index(name=index_name)
except Exception: # noqa S110
pass

time.sleep(sleep_time)

PineconeDocumentStore(
index=index_name,
namespace="test",
batch_size=50,
dimension=30,
metric="euclidean",
spec={"serverless": {"region": "us-east-1", "cloud": "aws"}},
)

index_description = client.describe_index(name=index_name)
assert index_description["name"] == index_name
assert index_description["dimension"] == 30
assert index_description["metric"] == "euclidean"
assert index_description["spec"]["serverless"]["region"] == "us-east-1"
assert index_description["spec"]["serverless"]["cloud"] == "aws"

try:
client.delete_index(name=index_name)
except Exception: # noqa S110
pass


@pytest.mark.integration
@pytest.mark.skipif("PINECONE_API_KEY" not in os.environ, reason="PINECONE_API_KEY not set")
class TestDocumentStore(CountDocumentsTest, DeleteDocumentsTest, WriteDocumentsTest):
Expand Down
Loading

0 comments on commit 4dda1b9

Please sign in to comment.