Skip to content
This repository has been archived by the owner on Nov 13, 2024. It is now read-only.

Add support for namespaces #243

Merged
merged 26 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ Canopy supports files in `jsonl`, `parquet` and `csv` formats. Additionally, you

[This notebook](https://colab.research.google.com/github/pinecone-io/examples/blob/master/learn/generation/canopy/00-canopy-data-prep.ipynb) shows how you create a dataset in this format, Follow the instructions in the CLI when you upload your data.

> [!TIP]
> If you would like to separate your data into [namespaces](https://docs.pinecone.io/docs/namespaces),
> you can use the `--namespace` option or the `INDEX_NAMESPACE` environment variable.

### 3. Start the Canopy server

The Canopy server exposes Canopy's functionality via a REST API. Namely, it allows you to upload documents, retrieve relevant docs for a given query, and chat with your data. The server exposes a `/chat.completion` endpoint that can be easily integrated with any chat application.
Expand Down Expand Up @@ -219,21 +223,19 @@ This will open a similar chat interface window, but will show both the RAG and n
If you already have an application that uses the OpenAI API, you can migrate it to **Canopy** by simply changing the API endpoint to `http://host:port/v1`, for example with the default configuration:

```python
import openai

openai.api_base = "http://localhost:8000/v1"
from openai import OpenAI

# now you can use the OpenAI API as usual
client = OpenAI(base_url="http://localhost:8000/v1")
```

or without global state change:

If you would like to use a specific index namespace for chatting, you can just append the namespace to the API endpoint:
```python
import openai
from openai import OpenAI

openai_response = openai.Completion.create(..., api_base="http://localhost:8000/v1")
client = OpenAI(base_url="http://localhost:8000/v1/my-namespace")
```


### Running Canopy server in production

Canopy is using FastAPI as the web framework and Uvicorn as the ASGI server. It is recommended to use Gunicorn as the production server, mainly because it supports multiple worker processes and can handle multiple requests in parallel, more details can be found [here](https://www.uvicorn.org/deployment/#using-a-process-manager).
Expand Down
10 changes: 7 additions & 3 deletions src/canopy/chat_engine/chat_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ def chat(self,
messages: Messages,
*,
stream: bool = False,
model_params: Optional[dict] = None
model_params: Optional[dict] = None,
namespace: Optional[str] = None
) -> Union[ChatResponse, StreamingChatResponse]:
"""
Chat completion with RAG. Given a list of messages (history), the chat engine will generate the next response, based on the relevant context retrieved from the knowledge base.
Expand All @@ -180,6 +181,7 @@ def chat(self,
messages: A list of messages (history) to generate the next response from.
stream: A boolean flag to indicate if the chat should be streamed or not. Defaults to False.
model_params: A dictionary of model parameters to use for the LLM. Defaults to None, which means the LLM will use its default values.
namespace: The namespace of the index for context retreival. To learn more about namespaces, see https://docs.pinecone.io/docs/namespaces

Returns:
A ChatResponse object if stream is False, or a StreamingChatResponse object if stream is True.
Expand All @@ -196,7 +198,7 @@ def chat(self,
>>> for chunk in response.chunks:
... print(chunk.json())
""" # noqa: E501
context = self._get_context(messages)
context = self._get_context(messages, namespace)
llm_messages = self._history_pruner.build(
chat_history=messages,
max_tokens=self.max_prompt_tokens,
Expand Down Expand Up @@ -227,9 +229,11 @@ def chat(self,

def _get_context(self,
messages: Messages,
namespace: Optional[str] = None
) -> Context:
queries = self._query_builder.generate(messages, self.max_prompt_tokens)
context = self.context_engine.query(queries, self.max_context_tokens)
context = self.context_engine.query(queries, self.max_context_tokens,
namespace=namespace)
return context

async def achat(self,
Expand Down
18 changes: 13 additions & 5 deletions src/canopy/context_engine/context_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
class BaseContextEngine(ABC, ConfigurableMixin):

@abstractmethod
def query(self, queries: List[Query], max_context_tokens: int, ) -> Context:
def query(self, queries: List[Query], max_context_tokens: int, *,
namespace: Optional[str]) -> Context:
pass

@abstractmethod
async def aquery(self, queries: List[Query], max_context_tokens: int, ) -> Context:
async def aquery(self, queries: List[Query], max_context_tokens: int, *,
namespace: Optional[str]) -> Context:
pass


Expand Down Expand Up @@ -81,13 +83,17 @@ def __init__(self,

self.global_metadata_filter = global_metadata_filter

def query(self, queries: List[Query], max_context_tokens: int, ) -> Context:
def query(self, queries: List[Query],
max_context_tokens: int,
*,
namespace: Optional[str] = None) -> Context:
izellevy marked this conversation as resolved.
Show resolved Hide resolved
"""
Query the knowledge base for relevant documents and build a context from the retrieved documents that can be injected into the LLM prompt.

Args:
queries: A list of queries to use for retrieving documents from the knowledge base
max_context_tokens: The maximum number of tokens to use for the context
namespace: The namespace to query in the underlying `KnowledgeBase`. To learn more about namespaces, see https://docs.pinecone.io/docs/namespaces

Returns:
A Context object containing the retrieved documents and metadata
Expand All @@ -100,12 +106,14 @@ def query(self, queries: List[Query], max_context_tokens: int, ) -> Context:
""" # noqa: E501
query_results = self.knowledge_base.query(
queries,
global_metadata_filter=self.global_metadata_filter)
global_metadata_filter=self.global_metadata_filter,
namespace=namespace)
context = self.context_builder.build(query_results, max_context_tokens)

if CE_DEBUG_INFO:
context.debug_info["query_results"] = [qr.dict() for qr in query_results]
return context

async def aquery(self, queries: List[Query], max_context_tokens: int, ) -> Context:
async def aquery(self, queries: List[Query], max_context_tokens: int,
namespace: Optional[str] = None) -> Context:
raise NotImplementedError()
6 changes: 4 additions & 2 deletions src/canopy/knowledge_base/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class BaseKnowledgeBase(ABC, ConfigurableMixin):
@abstractmethod
def query(self,
queries: List[Query],
global_metadata_filter: Optional[dict] = None
global_metadata_filter: Optional[dict] = None,
namespace: Optional[str] = None
) -> List[QueryResult]:
pass

Expand All @@ -38,7 +39,8 @@ def verify_index_connection(self) -> None:
@abstractmethod
async def aquery(self,
queries: List[Query],
global_metadata_filter: Optional[dict] = None
global_metadata_filter: Optional[dict] = None,
namespace: Optional[str] = None
) -> List[QueryResult]:
pass

Expand Down
19 changes: 12 additions & 7 deletions src/canopy/knowledge_base/knowledge_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from canopy.knowledge_base.reranker import Reranker, TransparentReranker
from canopy.models.data_models import Query, Document


INDEX_NAME_PREFIX = "canopy--"
TIMEOUT_INDEX_CREATE = 300
TIMEOUT_INDEX_PROVISION = 30
Expand Down Expand Up @@ -69,7 +68,6 @@ def list_canopy_indexes() -> List[str]:


class KnowledgeBase(BaseKnowledgeBase):

"""
The `KnowledgeBase` is used to store and retrieve text documents, using an underlying Pinecone index.
Every document is chunked into multiple text snippets based on the text structure (e.g. Markdown or HTML formatting)
Expand Down Expand Up @@ -401,7 +399,8 @@ def delete_index(self):

def query(self,
queries: List[Query],
global_metadata_filter: Optional[dict] = None
global_metadata_filter: Optional[dict] = None,
namespace: Optional[str] = None
) -> List[QueryResult]:
"""
Query the knowledge base to retrieve document chunks.
Expand All @@ -417,6 +416,8 @@ def query(self,
global_metadata_filter: A metadata filter to apply to all queries, in addition to any query-specific filters.
For example, the filter {"website": "wiki"} will only return documents with the metadata {"website": "wiki"} (in case provided in upsert)
see https://docs.pinecone.io/docs/metadata-filtering
namespace: The namespace that will be queried in the underlying index. To learn more about namespaces, see https://docs.pinecone.io/docs/namespaces

Returns:
A list of QueryResult objects.

Expand All @@ -436,7 +437,9 @@ def query(self,
raise RuntimeError(self._connection_error_msg)

queries = self._encoder.encode_queries(queries)
results = [self._query_index(q, global_metadata_filter) for q in queries]
results = [self._query_index(q,
global_metadata_filter,
namespace) for q in queries]
results = self._reranker.rerank(results)

return [
Expand All @@ -455,7 +458,8 @@ def query(self,

def _query_index(self,
query: KBQuery,
global_metadata_filter: Optional[dict]) -> KBQueryResult:
global_metadata_filter: Optional[dict],
namespace: Optional[str] = None) -> KBQueryResult:
if self._index is None:
raise RuntimeError(self._connection_error_msg)

Expand All @@ -471,7 +475,7 @@ def _query_index(self,
result = self._index.query(vector=query.values,
sparse_vector=query.sparse_values,
top_k=top_k,
namespace=query.namespace,
namespace=namespace,
filter=metadata_filter,
include_metadata=True,
_check_return_type=_check_return_type,
Expand Down Expand Up @@ -678,7 +682,8 @@ def _is_starter_env():

async def aquery(self,
queries: List[Query],
global_metadata_filter: Optional[dict] = None
global_metadata_filter: Optional[dict] = None,
namespace: Optional[str] = None
) -> List[QueryResult]:
raise NotImplementedError()

Expand Down
4 changes: 0 additions & 4 deletions src/canopy/models/data_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@

class Query(BaseModel):
text: str = Field(description="The query text.")
namespace: str = Field(
default="",
description="The namespace of the query. To learn more about namespaces, see https://docs.pinecone.io/docs/namespaces", # noqa: E501
)
metadata_filter: Optional[dict] = Field(
default=None,
description="A Pinecone metadata filter, to learn more about metadata filters, see https://docs.pinecone.io/docs/metadata-filtering", # noqa: E501
Expand Down
46 changes: 33 additions & 13 deletions src/canopy_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@


CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
DEFAULT_SERVER_URL = f"http://localhost:8000/{API_VERSION}"
DEFAULT_SERVER_URL = f"http://localhost:8000/{API_VERSION}/"
spinner = Spinner()


def check_server_health(url: str, timeout_seconds: int = 30):
try:
res = requests.get(urljoin(url, "/health"), timeout=timeout_seconds)
res = requests.get(urljoin(url, "health"), timeout=timeout_seconds)
res.raise_for_status()
return res.ok
except requests.exceptions.ConnectionError:
Expand Down Expand Up @@ -280,6 +280,9 @@ def _batch_documents_by_chunks(chunker: Chunker,

Load all the documents from a data file or a directory containing multiple data
files. The allowed formats are .jsonl, .parquet, .csv, and .txt.

If you would like partition your data into namespaces, you can use the namespace parameter.
For more information see: https://docs.pinecone.io/docs/namespaces
""" # noqa: E501
)
)
Expand All @@ -297,12 +300,17 @@ def _batch_documents_by_chunks(chunker: Chunker,
"long as less than 10% of the documents have failed to be uploaded.")
@click.option("--config", "-c", default=None, envvar="CANOPY_CONFIG_FILE",
help="Path to a canopy config file. Can also be set by the "
"`CANOPY_CONFIG_FILE` envrionment variable. Otherwise, the built-in"
"defualt configuration will be used.")
"`CANOPY_CONFIG_FILE` environment variable. Otherwise, the built-in "
"default configuration will be used.")
@click.option("--namespace", "-n", default="", envvar="INDEX_NAMESPACE",
help="The namespace of the index. Can also be set by the "
"`INDEX_NAMESPACE` environment variable. If not set, the default "
"namespace will be used.")
def upsert(index_name: str,
data_path: str,
allow_failures: bool,
config: Optional[str]):
config: Optional[str],
namespace: str):
if index_name is None:
msg = (
"No index name provided. Please set --index-name or INDEX_NAME environment "
Expand Down Expand Up @@ -330,7 +338,10 @@ def upsert(index_name: str,
click.echo("Canopy is going to upsert data from ", nl=False)
click.echo(click.style(f'{data_path}', fg='yellow'), nl=False)
click.echo(" to index: ")
click.echo(click.style(f'{kb.index_name} \n', fg='green'))
click.echo(click.style(f'{kb.index_name}', fg='green'), nl=False)
click.echo(" using namespace: ", nl=False)
click.echo(click.style(f'{namespace or "default"} \n', fg='cyan'))

with spinner:
try:
data = load_from_path(data_path)
Expand Down Expand Up @@ -366,7 +377,7 @@ def upsert(index_name: str,
for batch in _batch_documents_by_chunks(kb._chunker, data,
batch_size=kb._encoder.batch_size):
try:
kb.upsert(batch)
kb.upsert(batch, namespace=namespace)
except Exception as e:
if allow_failures and len(failed_docs) < len(data) // 10:
failed_docs.extend([_.id for _ in batch])
Expand Down Expand Up @@ -402,7 +413,7 @@ def _chat(
openai_api_key=None,
api_base=None,
stream=True,
print_debug_info=False,
print_debug_info=False
):
if openai_api_key is None:
openai_api_key = os.environ.get("OPENAI_API_KEY")
Expand All @@ -413,9 +424,11 @@ def _chat(
"Please set the OPENAI_API_KEY environment "
"variable."
)

client = openai.OpenAI(base_url=api_base, api_key=openai_api_key)

output = ""
history += [{"role": "user", "content": message}]
client = openai.OpenAI(base_url=api_base, api_key=openai_api_key)

start = time.time()
try:
Expand Down Expand Up @@ -478,6 +491,7 @@ def _chat(
"""

)

)
@click.option("--stream/--no-stream", default=True,
help="Stream the response from the RAG chatbot word by word.")
Expand Down Expand Up @@ -581,6 +595,12 @@ def chat(chat_server_url, rag, debug, stream):

If you would like to try out the chatbot, run `canopy chat` in a separate
terminal window.

You can also use the OpenAPI client to access the Canopy server just
by changing the API endpoint.

For more information see:
https://github.com/pinecone-io/canopy?tab=readme-ov-file#migrating-an-existing-openai-application-to-canopy
"""
)
)
Expand Down Expand Up @@ -675,7 +695,7 @@ def stop(url):
raise CLIError(msg)

try:
res = requests.get(urljoin(url, "/shutdown"))
res = requests.get(urljoin(url, "shutdown"))
res.raise_for_status()
return res.ok
except requests.exceptions.ConnectionError:
Expand All @@ -693,8 +713,8 @@ def stop(url):
"""
)
)
@click.option("--url", default="http://localhost:8000",
help="Canopy's server url. Defaults to http://localhost:8000")
@click.option("--url", default=DEFAULT_SERVER_URL,
help=f"Canopy's server url. Defaults to {DEFAULT_SERVER_URL}")
def api_docs(url):
import webbrowser

Expand All @@ -720,7 +740,7 @@ def api_docs(url):
print(HTML_TEMPLATE % json.dumps(app.openapi()), file=fd)
webbrowser.open('file://' + os.path.realpath(filename))
else:
webbrowser.open(urljoin(url, "redoc"))
webbrowser.open(urljoin(url, "/redoc"))


if __name__ == "__main__":
Expand Down
Loading