Skip to content

Commit

Permalink
feat: Add elastic store (#34)
Browse files Browse the repository at this point in the history
* add elastic search store
* feat add vector search store
* update documentation
  • Loading branch information
karllu3 authored May 27, 2024
1 parent bc912c1 commit edd6de9
Show file tree
Hide file tree
Showing 15 changed files with 569 additions and 18 deletions.
1 change: 0 additions & 1 deletion docs/how-to/custom_views_code.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# pylint: disable=missing-return-doc, missing-param-doc, missing-function-docstring, missing-class-docstring, missing-raises-doc
import dbally
import os
import asyncio
from dataclasses import dataclass
from typing import Iterable, Callable, Any
Expand Down
3 changes: 0 additions & 3 deletions docs/how-to/pandas_views_code.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# pylint: disable=missing-return-doc, missing-param-doc, missing-function-docstring, missing-class-docstring, missing-raises-doc
import dbally
import os
import asyncio
from dataclasses import dataclass
from typing import Iterable, Callable, Any
import pandas as pd

from dbally import decorators, DataFrameBaseView
Expand Down
103 changes: 103 additions & 0 deletions docs/how-to/use_elastic_store.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# How-To Use Elastic to Store Similarity Index

[ElasticStore](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-store.html) can be used as a store in SimilarityIndex. In this guide, we will show you how to execute a similarity search using Elasticsearch.
In the example, the Elasticsearch engine is provided by the official Docker image. There are two approaches available to perform similarity searches: Elastic Search Store and Elastic Vector Search.
Elastic Search Store uses embeddings and kNN search to find similarities, while Elastic Vector Search, which performs semantic search, uses the ELSER (Elastic Learned Sparse EncodeR) model to encode and search the data.


## Prerequisites

[Download and deploy the Elasticsearch Docker image](https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html). Please note that for Elastic Vector Search, the Elasticsearch Docker container requires at least 8GB of RAM and
[license activation](https://www.elastic.co/guide/en/kibana/current/managing-licenses.html) to use Machine Learning capabilities.


```commandline
docker network create elastic
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.13.4
docker run --name es01 --net elastic -p 9200:9200 -it -m 2GB docker.elastic.co/elasticsearch/elasticsearch:8.13.4
```

Copy the generated elastic password and enrollment token. These credentials are only shown when you start Elasticsearch for the first time once. You can regenerate the credentials using the following commands.
```commandline
docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .
curl --cacert http_ca.crt -u elastic:$ELASTIC_PASSWORD https://localhost:9200
```

To manage elasticsearch engine create Kibana container.
```commandline
docker run --name kib01 --net elastic -p 5601:5601 docker.elastic.co/kibana/kibana:8.13.4
```

By default, the Kibana management dashboard is deployed at [link](http://localhost:5601/)


For vector search, it is necessary to enroll in an [appropriate subscription level](https://www.elastic.co/subscriptions) or trial version that supports machine learning.
Additionally, the [ELSER model must be downloaded](https://www.elastic.co/guide/en/machine-learning/current/ml-nlp-elser.html), which can be done through Kibana. Instructions can be found in the hosted Kibana instance under tabs:
<br />downloading and deploying model - **Analytics -> Machine Learning -> Trained Model**,
<br />vector search configuration - **Search -> Elastic Search -> Vector Search.**


* Install elasticsearch extension
```commandline
pip install dbally[elasticsearch]
```

## Implementing a SimilarityIndex

To use similarity search it is required to define data fetcher and data store.

### Data fetcher

```python
class DummyCountryFetcher(SimilarityFetcher):
async def fetch(self):
return ["United States", "Canada", "Mexico"]
```

### Data store
Elastic store similarity search works on embeddings. For create embeddings the embedding client is passed as an argument.
You can use [one of dbally embedding clients][dbally.embeddings.EmbeddingClient], such as [LiteLLMEmbeddingClient][dbally.embeddings.LiteLLMEmbeddingClient].

```python
from dbally.embeddings.litellm import LiteLLMEmbeddingClient

embedding_client=LiteLLMEmbeddingClient(api_key="your-api-key")
```

to define your [`ElasticsearchStore`][dbally.similarity.ElasticsearchStore].

```python
from dbally.similarity.elasticsearch_store import ElasticsearchStore

data_store = ElasticsearchStore(
index_name="country_similarity",
host="https://localhost:9200",
ca_cert_path="path_to_cert/http_ca.crt",
http_user="elastic",
http_password="password",
embedding_client=embedding_client,
),

```

After this setup, you can initialize the SimilarityIndex

```python
from dbally.similarity import SimilarityIndex

country_similarity = SimilarityIndex(
fetcher=DummyCountryFetcher(),
store=data_store
)
```

and [update it and find the closest matches in the same way as in built-in similarity indices](use_custom_similarity_store.md/#using-the-similar)

You can then use this store to create a similarity index that maps user input to the closest matching value.
To use Elastic Vector search download and deploy [ELSER v2](https://www.elastic.co/guide/en/machine-learning/current/ml-nlp-elser.html#elser-v2) model and create [ingest pipeline](https://www.elastic.co/guide/en/machine-learning/current/ml-nlp-elser.html#elasticsearch-ingest-pipeline).
Now you can use this index to map user input to the closest matching value. For example, a user may type 'United States' and our index would return 'USA'.

## Links
* [Similarity Indexes](use_custom_similarity_store.md)
* [Example Elastic Search Store](use_elasticsearch_store_code.py)
* [Example Elastic Vector Search](use_elastic_vector_store_code.py)
102 changes: 102 additions & 0 deletions docs/how-to/use_elastic_vector_store_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# pylint: disable=missing-return-doc, missing-param-doc, missing-function-docstring
import os
import asyncio
from typing_extensions import Annotated

import asyncclick as click
from dotenv import load_dotenv
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.automap import automap_base

import dbally
from dbally import decorators, SqlAlchemyBaseView
from dbally.audit.event_handlers.cli_event_handler import CLIEventHandler
from dbally.llms.litellm import LiteLLM
from dbally.similarity import SimpleSqlAlchemyFetcher, SimilarityIndex
from dbally.similarity.elastic_vector_search import ElasticVectorStore

load_dotenv()
engine = create_engine("sqlite:///candidates.db")


Base = automap_base()
Base.prepare(autoload_with=engine)

Candidate = Base.classes.candidates

country_similarity = SimilarityIndex(
fetcher=SimpleSqlAlchemyFetcher(
engine,
table=Candidate,
column=Candidate.country,
),
store=ElasticVectorStore(
index_name="country_vector_similarity",
host=os.environ["ELASTIC_STORE_CONNECTION_STRING"],
ca_cert_path=os.environ["ELASTIC_CERT_PATH"],
http_user=os.environ["ELASTIC_AUTH_USER"],
http_password=os.environ["ELASTIC_USER_PASSWORD"],
),
)


class CandidateView(SqlAlchemyBaseView):
"""
A view for retrieving candidates from the database.
"""

def get_select(self) -> sqlalchemy.Select:
"""
Creates the initial SqlAlchemy select object, which will be used to build the query.
"""
return sqlalchemy.select(Candidate)

@decorators.view_filter()
def at_least_experience(self, years: int) -> sqlalchemy.ColumnElement:
"""
Filters candidates with at least `years` of experience.
"""
return Candidate.years_of_experience >= years

@decorators.view_filter()
def senior_data_scientist_position(self) -> sqlalchemy.ColumnElement:
"""
Filters candidates that can be considered for a senior data scientist position.
"""
return sqlalchemy.and_(
Candidate.position.in_(["Data Scientist", "Machine Learning Engineer", "Data Engineer"]),
Candidate.years_of_experience >= 3,
)

@decorators.view_filter()
def from_country(self, country: Annotated[str, country_similarity]) -> sqlalchemy.ColumnElement:
"""
Filters candidates from a specific country.
"""
return Candidate.country == country


@click.command()
@click.argument("country", type=str, default="United States")
@click.argument("years_of_experience", type=str, default="2")
async def main(country="United States", years_of_experience="2"):
await country_similarity.update()

llm = LiteLLM(model_name="gpt-3.5-turbo", api_key=os.environ["OPENAI_API_KEY"])
collection = dbally.create_collection("recruitment", llm, event_handlers=[CLIEventHandler()])
collection.add(CandidateView, lambda: CandidateView(engine))

result = await collection.ask(
f"Find someone from the {country} with more than {years_of_experience} years of experience."
)

print(f"The generated SQL query is: {result.context.get('sql')}")
print()
print(f"Retrieved {len(result.results)} candidates:")
for candidate in result.results:
print(candidate)


if __name__ == "__main__":
asyncio.run(main())
106 changes: 106 additions & 0 deletions docs/how-to/use_elasticsearch_store_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# pylint: disable=missing-return-doc, missing-param-doc, missing-function-docstring
import os
import asyncio
from typing_extensions import Annotated

import asyncclick as click
from dotenv import load_dotenv
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.automap import automap_base

import dbally
from dbally import decorators, SqlAlchemyBaseView
from dbally.audit.event_handlers.cli_event_handler import CLIEventHandler
from dbally.similarity import SimpleSqlAlchemyFetcher, SimilarityIndex
from dbally.embeddings.litellm import LiteLLMEmbeddingClient
from dbally.llms.litellm import LiteLLM
from dbally.similarity.elasticsearch_store import ElasticsearchStore

load_dotenv()
engine = create_engine("sqlite:///candidates.db")


Base = automap_base()
Base.prepare(autoload_with=engine)

Candidate = Base.classes.candidates

country_similarity = SimilarityIndex(
fetcher=SimpleSqlAlchemyFetcher(
engine,
table=Candidate,
column=Candidate.country,
),
store=ElasticsearchStore(
index_name="country_similarity",
host=os.environ["ELASTIC_STORE_CONNECTION_STRING"],
ca_cert_path=os.environ["ELASTIC_CERT_PATH"],
http_user=os.environ["ELASTIC_AUTH_USER"],
http_password=os.environ["ELASTIC_USER_PASSWORD"],
embedding_client=LiteLLMEmbeddingClient(
api_key=os.environ["OPENAI_API_KEY"],
),
),
)


class CandidateView(SqlAlchemyBaseView):
"""
A view for retrieving candidates from the database.
"""

def get_select(self) -> sqlalchemy.Select:
"""
Creates the initial SqlAlchemy select object, which will be used to build the query.
"""
return sqlalchemy.select(Candidate)

@decorators.view_filter()
def at_least_experience(self, years: int) -> sqlalchemy.ColumnElement:
"""
Filters candidates with at least `years` of experience.
"""
return Candidate.years_of_experience >= years

@decorators.view_filter()
def senior_data_scientist_position(self) -> sqlalchemy.ColumnElement:
"""
Filters candidates that can be considered for a senior data scientist position.
"""
return sqlalchemy.and_(
Candidate.position.in_(["Data Scientist", "Machine Learning Engineer", "Data Engineer"]),
Candidate.years_of_experience >= 3,
)

@decorators.view_filter()
def from_country(self, country: Annotated[str, country_similarity]) -> sqlalchemy.ColumnElement:
"""
Filters candidates from a specific country.
"""
return Candidate.country == country


@click.command()
@click.argument("country", type=str, default="United States")
@click.argument("years_of_experience", type=str, default="2")
async def main(country="United States", years_of_experience="2"):
await country_similarity.update()
await country_similarity.update()
llm = LiteLLM(model_name="gpt-3.5-turbo", api_key=os.environ["OPENAI_API_KEY"])
collection = dbally.create_collection("recruitment", llm, event_handlers=[CLIEventHandler()])
collection.add(CandidateView, lambda: CandidateView(engine))

result = await collection.ask(
f"Find someone from the {country} with more than {years_of_experience} years of experience."
)

print(f"The generated SQL query is: {result.context.get('sql')}")
print()
print(f"Retrieved {len(result.results)} candidates:")
for candidate in result.results:
print(candidate)


if __name__ == "__main__":
asyncio.run(main())
11 changes: 8 additions & 3 deletions docs/quickstart/quickstart2_code.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
# pylint: disable=missing-return-doc, missing-param-doc, missing-function-docstring
import dbally
import os
import asyncio
from typing_extensions import Annotated

from dotenv import load_dotenv
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.automap import automap_base

import dbally
from dbally import decorators, SqlAlchemyBaseView
from dbally.audit.event_handlers.cli_event_handler import CLIEventHandler
from dbally.similarity import SimpleSqlAlchemyFetcher, FaissStore, SimilarityIndex
from dbally.embeddings.litellm import LiteLLMEmbeddingClient
from dbally.llms.litellm import LiteLLM

engine = create_engine('sqlite:///candidates.db')
load_dotenv()
engine = create_engine("sqlite:///candidates.db")

Base = automap_base()
Base.prepare(autoload_with=engine)

Candidate = Base.classes.candidates

country_similarity = SimilarityIndex(
fetcher=SimpleSqlAlchemyFetcher(
fetcher=SimpleSqlAlchemyFetcher(
engine,
table=Candidate,
column=Candidate.country,
Expand All @@ -37,10 +39,12 @@
),
)


class CandidateView(SqlAlchemyBaseView):
"""
A view for retrieving candidates from the database.
"""

def get_select(self) -> sqlalchemy.Select:
"""
Creates the initial SqlAlchemy select object, which will be used to build the query.
Expand Down Expand Up @@ -71,6 +75,7 @@ def from_country(self, country: Annotated[str, country_similarity]) -> sqlalchem
"""
return Candidate.country == country


async def main():
await country_similarity.update()

Expand Down
Loading

0 comments on commit edd6de9

Please sign in to comment.