Skip to content
This repository has been archived by the owner on Nov 13, 2024. It is now read-only.

Add support for namespaces #243

Merged
merged 26 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/canopy/chat_engine/chat_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ def chat(self,
messages: Messages,
*,
stream: bool = False,
model_params: Optional[dict] = None
model_params: Optional[dict] = None,
namespace: Optional[str] = None
) -> Union[ChatResponse, StreamingChatResponse]:
"""
Chat completion with RAG. Given a list of messages (history), the chat engine will generate the next response, based on the relevant context retrieved from the knowledge base.
Expand All @@ -180,6 +181,7 @@ def chat(self,
messages: A list of messages (history) to generate the next response from.
stream: A boolean flag to indicate if the chat should be streamed or not. Defaults to False.
model_params: A dictionary of model parameters to use for the LLM. Defaults to None, which means the LLM will use its default values.
namespace: The namespace of the index for context retreival. To learn more about namespaces, see https://docs.pinecone.io/docs/namespaces

Returns:
A ChatResponse object if stream is False, or a StreamingChatResponse object if stream is True.
Expand All @@ -196,7 +198,7 @@ def chat(self,
>>> for chunk in response.chunks:
... print(chunk.json())
""" # noqa: E501
context = self._get_context(messages)
context = self._get_context(messages, namespace)
llm_messages = self._history_pruner.build(
chat_history=messages,
max_tokens=self.max_prompt_tokens,
Expand Down Expand Up @@ -227,9 +229,11 @@ def chat(self,

def _get_context(self,
messages: Messages,
namespace: Optional[str] = None
) -> Context:
queries = self._query_builder.generate(messages, self.max_prompt_tokens)
context = self.context_engine.query(queries, self.max_context_tokens)
context = self.context_engine.query(queries, self.max_context_tokens,
namespace=namespace)
return context

async def achat(self,
Expand Down
18 changes: 13 additions & 5 deletions src/canopy/context_engine/context_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
class BaseContextEngine(ABC, ConfigurableMixin):

@abstractmethod
def query(self, queries: List[Query], max_context_tokens: int, ) -> Context:
def query(self, queries: List[Query], max_context_tokens: int, *,
namespace: Optional[str]) -> Context:
pass

@abstractmethod
async def aquery(self, queries: List[Query], max_context_tokens: int, ) -> Context:
async def aquery(self, queries: List[Query], max_context_tokens: int, *,
namespace: Optional[str]) -> Context:
pass


Expand Down Expand Up @@ -81,13 +83,17 @@ def __init__(self,

self.global_metadata_filter = global_metadata_filter

def query(self, queries: List[Query], max_context_tokens: int, ) -> Context:
def query(self, queries: List[Query],
max_context_tokens: int,
*,
namespace: Optional[str] = None) -> Context:
izellevy marked this conversation as resolved.
Show resolved Hide resolved
"""
Query the knowledge base for relevant documents and build a context from the retrieved documents that can be injected into the LLM prompt.

Args:
queries: A list of queries to use for retrieving documents from the knowledge base
max_context_tokens: The maximum number of tokens to use for the context
namespace: The namespace to query in the underlying `KnowledgeBase`. To learn more about namespaces, see https://docs.pinecone.io/docs/namespaces

Returns:
A Context object containing the retrieved documents and metadata
Expand All @@ -100,12 +106,14 @@ def query(self, queries: List[Query], max_context_tokens: int, ) -> Context:
""" # noqa: E501
query_results = self.knowledge_base.query(
queries,
global_metadata_filter=self.global_metadata_filter)
global_metadata_filter=self.global_metadata_filter,
namespace=namespace)
context = self.context_builder.build(query_results, max_context_tokens)

if CE_DEBUG_INFO:
context.debug_info["query_results"] = [qr.dict() for qr in query_results]
return context

async def aquery(self, queries: List[Query], max_context_tokens: int, ) -> Context:
async def aquery(self, queries: List[Query], max_context_tokens: int,
namespace: Optional[str] = None) -> Context:
raise NotImplementedError()
6 changes: 4 additions & 2 deletions src/canopy/knowledge_base/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class BaseKnowledgeBase(ABC, ConfigurableMixin):
@abstractmethod
def query(self,
queries: List[Query],
global_metadata_filter: Optional[dict] = None
global_metadata_filter: Optional[dict] = None,
namespace: Optional[str] = None
) -> List[QueryResult]:
pass

Expand All @@ -38,7 +39,8 @@ def verify_index_connection(self) -> None:
@abstractmethod
async def aquery(self,
queries: List[Query],
global_metadata_filter: Optional[dict] = None
global_metadata_filter: Optional[dict] = None,
namespace: Optional[str] = None
) -> List[QueryResult]:
pass

Expand Down
19 changes: 12 additions & 7 deletions src/canopy/knowledge_base/knowledge_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from canopy.knowledge_base.reranker import Reranker, TransparentReranker
from canopy.models.data_models import Query, Document


INDEX_NAME_PREFIX = "canopy--"
TIMEOUT_INDEX_CREATE = 300
TIMEOUT_INDEX_PROVISION = 30
Expand Down Expand Up @@ -69,7 +68,6 @@ def list_canopy_indexes() -> List[str]:


class KnowledgeBase(BaseKnowledgeBase):

"""
The `KnowledgeBase` is used to store and retrieve text documents, using an underlying Pinecone index.
Every document is chunked into multiple text snippets based on the text structure (e.g. Markdown or HTML formatting)
Expand Down Expand Up @@ -401,7 +399,8 @@ def delete_index(self):

def query(self,
queries: List[Query],
global_metadata_filter: Optional[dict] = None
global_metadata_filter: Optional[dict] = None,
namespace: Optional[str] = None
) -> List[QueryResult]:
"""
Query the knowledge base to retrieve document chunks.
Expand All @@ -417,6 +416,8 @@ def query(self,
global_metadata_filter: A metadata filter to apply to all queries, in addition to any query-specific filters.
For example, the filter {"website": "wiki"} will only return documents with the metadata {"website": "wiki"} (in case provided in upsert)
see https://docs.pinecone.io/docs/metadata-filtering
namespace: The namespace that will be queried in the underlying index. To learn more about namespaces, see https://docs.pinecone.io/docs/namespaces

Returns:
A list of QueryResult objects.

Expand All @@ -436,7 +437,9 @@ def query(self,
raise RuntimeError(self._connection_error_msg)

queries = self._encoder.encode_queries(queries)
results = [self._query_index(q, global_metadata_filter) for q in queries]
results = [self._query_index(q,
global_metadata_filter,
namespace) for q in queries]
results = self._reranker.rerank(results)

return [
Expand All @@ -455,7 +458,8 @@ def query(self,

def _query_index(self,
query: KBQuery,
global_metadata_filter: Optional[dict]) -> KBQueryResult:
global_metadata_filter: Optional[dict],
namespace: Optional[str] = None) -> KBQueryResult:
if self._index is None:
raise RuntimeError(self._connection_error_msg)

Expand All @@ -471,7 +475,7 @@ def _query_index(self,
result = self._index.query(vector=query.values,
sparse_vector=query.sparse_values,
top_k=top_k,
namespace=query.namespace,
namespace=namespace,
filter=metadata_filter,
include_metadata=True,
_check_return_type=_check_return_type,
Expand Down Expand Up @@ -678,7 +682,8 @@ def _is_starter_env():

async def aquery(self,
queries: List[Query],
global_metadata_filter: Optional[dict] = None
global_metadata_filter: Optional[dict] = None,
namespace: Optional[str] = None
) -> List[QueryResult]:
raise NotImplementedError()

Expand Down
4 changes: 0 additions & 4 deletions src/canopy/models/data_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@

class Query(BaseModel):
text: str = Field(description="The query text.")
namespace: str = Field(
default="",
description="The namespace of the query. To learn more about namespaces, see https://docs.pinecone.io/docs/namespaces", # noqa: E501
)
metadata_filter: Optional[dict] = Field(
default=None,
description="A Pinecone metadata filter, to learn more about metadata filters, see https://docs.pinecone.io/docs/metadata-filtering", # noqa: E501
Expand Down
40 changes: 28 additions & 12 deletions src/canopy_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@


CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
DEFAULT_SERVER_URL = f"http://localhost:8000/{API_VERSION}"
DEFAULT_SERVER_URL = f"http://localhost:8000/{API_VERSION}/"
spinner = Spinner()


def check_server_health(url: str, timeout_seconds: int = 30):
try:
res = requests.get(urljoin(url, "/health"), timeout=timeout_seconds)
res = requests.get(urljoin(url, "health"), timeout=timeout_seconds)
res.raise_for_status()
return res.ok
except requests.exceptions.ConnectionError:
Expand Down Expand Up @@ -297,12 +297,17 @@ def _batch_documents_by_chunks(chunker: Chunker,
"long as less than 10% of the documents have failed to be uploaded.")
@click.option("--config", "-c", default=None, envvar="CANOPY_CONFIG_FILE",
help="Path to a canopy config file. Can also be set by the "
"`CANOPY_CONFIG_FILE` envrionment variable. Otherwise, the built-in"
"defualt configuration will be used.")
"`CANOPY_CONFIG_FILE` environment variable. Otherwise, the built-in"
"default configuration will be used.")
@click.option("--namespace", "-n", default="", envvar="INDEX_NAMESPACE",
help="The namespace of the index. Can also be set by the "
"`INDEX_NAMESPACE` environment variable. If not set, the default"
"namespace will be used.")
def upsert(index_name: str,
data_path: str,
allow_failures: bool,
config: Optional[str]):
config: Optional[str],
namespace: str):
if index_name is None:
msg = (
"No index name provided. Please set --index-name or INDEX_NAME environment "
Expand Down Expand Up @@ -366,7 +371,7 @@ def upsert(index_name: str,
for batch in _batch_documents_by_chunks(kb._chunker, data,
batch_size=kb._encoder.batch_size):
try:
kb.upsert(batch)
kb.upsert(batch, namespace=namespace)
except Exception as e:
if allow_failures and len(failed_docs) < len(data) // 10:
failed_docs.extend([_.id for _ in batch])
Expand Down Expand Up @@ -403,6 +408,7 @@ def _chat(
api_base=None,
stream=True,
print_debug_info=False,
namespace=None
):
if openai_api_key is None:
openai_api_key = os.environ.get("OPENAI_API_KEY")
Expand All @@ -413,9 +419,14 @@ def _chat(
"Please set the OPENAI_API_KEY environment "
"variable."
)

if api_base is not None and namespace is not None:
api_base = urljoin(api_base, namespace)

client = openai.OpenAI(base_url=api_base, api_key=openai_api_key)

output = ""
history += [{"role": "user", "content": message}]
client = openai.OpenAI(base_url=api_base, api_key=openai_api_key)

start = time.time()
try:
Expand Down Expand Up @@ -488,7 +499,11 @@ def _chat(
@click.option("--chat-server-url", default=DEFAULT_SERVER_URL,
help=("URL of the Canopy server to use."
f" Defaults to {DEFAULT_SERVER_URL}"))
def chat(chat_server_url, rag, debug, stream):
@click.option("--namespace", "-n", default=None, envvar="INDEX_NAMESPACE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit shooting ourselves in the leg, isn't it?
The point of using an API prefix is to tell users "to use namespaces, you simply change the base_url". If we're giving users a dedicated --namespace param - they might miss message.

Maybe we should give clear examples in the documentation (App API doc, CLI help messages, README) of using namespace by changing the API route?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, maybe it is a bit weird to enforce a namespace let's keep it dynamic. I am updating the docs also

help="The namespace of the index. Can also be set by the "
"`INDEX_NAMESPACE` environment variable. If not set, the default"
"namespace will be used.")
def chat(chat_server_url, rag, debug, stream, namespace):
check_server_health(chat_server_url)
note_msg = (
"🚨 Note 🚨\n"
Expand Down Expand Up @@ -547,6 +562,7 @@ def chat(chat_server_url, rag, debug, stream):
openai_api_key="canopy",
api_base=chat_server_url,
print_debug_info=debug,
namespace=namespace
)

if not rag:
Expand Down Expand Up @@ -675,7 +691,7 @@ def stop(url):
raise CLIError(msg)

try:
res = requests.get(urljoin(url, "/shutdown"))
res = requests.get(urljoin(url, "shutdown"))
res.raise_for_status()
return res.ok
except requests.exceptions.ConnectionError:
Expand All @@ -693,8 +709,8 @@ def stop(url):
"""
)
)
@click.option("--url", default="http://localhost:8000",
help="Canopy's server url. Defaults to http://localhost:8000")
@click.option("--url", default=DEFAULT_SERVER_URL,
help=f"Canopy's server url. Defaults to {DEFAULT_SERVER_URL}")
def api_docs(url):
import webbrowser

Expand All @@ -720,7 +736,7 @@ def api_docs(url):
print(HTML_TEMPLATE % json.dumps(app.openapi()), file=fd)
webbrowser.open('file://' + os.path.realpath(filename))
else:
webbrowser.open(urljoin(url, "redoc"))
webbrowser.open(urljoin(url, "/redoc"))


if __name__ == "__main__":
Expand Down
Loading