Skip to content

Commit

Permalink
Merge pull request #26 from prrao87/refactor
Browse files Browse the repository at this point in the history
Refactor all modules for performance and cleaner code
  • Loading branch information
prrao87 authored Apr 29, 2023
2 parents be49d18 + dd592e4 commit 2314010
Show file tree
Hide file tree
Showing 25 changed files with 602 additions and 308 deletions.
1 change: 1 addition & 0 deletions dbs/elasticsearch/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class Settings(BaseSettings):
elastic_password: str
elastic_url: str
elastic_port: int
elastic_index_alias: str
tag: str

class Config:
Expand Down
2 changes: 1 addition & 1 deletion dbs/elasticsearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: "3"
version: "3.9"

services:
elasticsearch:
Expand Down
3 changes: 1 addition & 2 deletions dbs/elasticsearch/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
elasticsearch>=8.7.0
pydantic>=1.10.7, <2.0.0
pydantic[dotenv]>=1.10.7, <2.0.0
fastapi>=0.95.0, <1.0.0
httpx>=0.24.0
aiohttp>=3.8.4
uvicorn>=0.21.0, <1.0.0
python-dotenv>=1.0.0
srsly>=2.4.6
137 changes: 79 additions & 58 deletions dbs/elasticsearch/scripts/bulk_index.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import argparse
import asyncio
import json
import os
import sys
import warnings
from concurrent.futures import ProcessPoolExecutor
from functools import lru_cache, partial
from pathlib import Path
from typing import Any, Iterator

Expand All @@ -13,6 +14,7 @@
from pydantic.main import ModelMetaclass

sys.path.insert(1, os.path.realpath(Path(__file__).resolve().parents[1]))
from api.config import Settings
from schemas.wine import Wine

load_dotenv()
Expand All @@ -27,6 +29,12 @@ class FileNotFoundError(Exception):
# --- Blocking functions ---


@lru_cache()
def get_settings():
# Use lru_cache to avoid loading .env file for every request
return Settings()


def chunk_iterable(item_list: list[JsonBlob], chunksize: int) -> Iterator[tuple[JsonBlob, ...]]:
"""
Break a large iterable into an iterable of smaller iterables of size `chunksize`
Expand Down Expand Up @@ -58,15 +66,20 @@ def validate(
return validated_data


def process_chunks(data: list[JsonBlob]) -> tuple[list[JsonBlob], str]:
validated_data = validate(data, Wine, exclude_none=True)
return validated_data


# --- Async functions ---


async def get_elastic_client() -> AsyncElasticsearch:
async def get_elastic_client(settings) -> AsyncElasticsearch:
# Get environment variables
USERNAME = os.environ.get("ELASTIC_USER")
PASSWORD = os.environ.get("ELASTIC_PASSWORD")
PORT = os.environ.get("ELASTIC_PORT")
ELASTIC_URL = os.environ.get("ELASTIC_URL")
USERNAME = settings.elastic_user
PASSWORD = settings.elastic_password
PORT = settings.elastic_port
ELASTIC_URL = settings.elastic_url
# Connect to ElasticSearch
elastic_client = AsyncElasticsearch(
f"http://{ELASTIC_URL}:{PORT}",
Expand All @@ -79,64 +92,70 @@ async def get_elastic_client() -> AsyncElasticsearch:
return elastic_client


async def create_index(client: AsyncElasticsearch, mappings_path: Path) -> None:
async def create_index(client: AsyncElasticsearch, index: str, mappings_path: Path) -> None:
"""Create an index associated with an alias in ElasticSearch"""
with open(mappings_path, "rb") as f:
config = json.load(f)

INDEX_ALIAS = os.environ.get("ELASTIC_INDEX_ALIAS")
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Get settings and mappings from the mappings.json file
mappings = config.get("mappings")
settings = config.get("settings")
index_name = f"{INDEX_ALIAS}-1"
try:
await client.indices.create(index=index_name, mappings=mappings, settings=settings)
await client.indices.put_alias(index=index_name, name=INDEX_ALIAS)
# Verify that the new index has been created
assert await client.indices.exists(index=index_name)
index_and_alias = await client.indices.get_alias(index=index_name)
print(index_and_alias)
except Exception as e:
print(f"Warning: Did not create index {index_name} due to exception {e}\n")
elastic_config = dict(srsly.read_json(mappings_path))
assert elastic_config is not None

if not client.indices.exists_alias(name=index):
print(f"Did not find index {index} in db, creating index...\n")
with warnings.catch_warnings():
warnings.simplefilter("ignore")
# Get settings and mappings from the mappings.json file
mappings = elastic_config.get("mappings")
settings = elastic_config.get("settings")
index_name = f"{index}-1"
try:
await client.indices.create(index=index_name, mappings=mappings, settings=settings)
await client.indices.put_alias(index=index_name, name=INDEX_ALIAS)
# Verify that the new index has been created
assert await client.indices.exists(index=index_name)
index_and_alias = await client.indices.get_alias(index=index_name)
print(index_and_alias)
except Exception as e:
print(f"Warning: Did not create index {index_name} due to exception {e}\n")
else:
print(f"Found index {index} in db, skipping index creation...\n")


async def bulk_index_wines_to_elastic(
client: AsyncElasticsearch, index: str, wines: list[Wine]
async def update_documents_to_index(
client: AsyncElasticsearch, index: str, data: list[Wine]
) -> None:
"""Bulk index a wine JsonBlob to ElasticSearch"""
async for success, response in helpers.async_streaming_bulk(
await helpers.async_bulk(
client,
wines,
data,
index=index,
chunk_size=5000,
max_retries=3,
initial_backoff=3,
max_backoff=10,
):
if not success:
print(f"A document failed to index: {response}")
chunk_size=CHUNKSIZE,
)
ids = [item["id"] for item in data]
print(f"Processed ids in range {min(ids)}-{max(ids)}")


async def main(chunked_data: Iterator[tuple[JsonBlob, ...]]) -> None:
async def main(data: list[JsonBlob], index: str) -> None:
settings = get_settings()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
elastic_client = await get_elastic_client()
INDEX_ALIAS = os.environ.get("ELASTIC_INDEX_ALIAS")
if not elastic_client.indices.exists_alias(name=INDEX_ALIAS):
print(f"Did not find index {INDEX_ALIAS} in db, creating index...\n")
await create_index(elastic_client, Path("mapping/mapping.json"))
counter = 0
for chunk in chunked_data:
validated_data = validate(chunk, Wine)
counter += len(validated_data)
ids = [item["id"] for item in validated_data]
try:
await bulk_index_wines_to_elastic(elastic_client, INDEX_ALIAS, validated_data)
print(f"Indexed {counter} items")
except Exception as e:
print(f"{e}: Failed to index items in the ID range {min(ids)}-{max(ids)} to db")
elastic_client = await get_elastic_client(settings)
await create_index(elastic_client, index, Path("mapping/mapping.json"))

# Process multiple chunks of data in a process pool to avoid blocking the event loop
print("Processing chunks")
chunked_data = chunk_iterable(data, CHUNKSIZE)

with ProcessPoolExecutor() as pool:
loop = asyncio.get_running_loop()
executor_tasks = [partial(process_chunks, chunk) for chunk in chunked_data]
awaitables = [loop.run_in_executor(pool, call) for call in executor_tasks]
# Attach process pool to running event loop so that we can process multiple chunks in parallel
validated_data = await asyncio.gather(*awaitables)
tasks = [
update_documents_to_index(elastic_client, index, data) for data in validated_data
]
try:
await asyncio.gather(*tasks)
print("Finished execution!")
except Exception as e:
print(f"{e}: Error while indexing to db")
# Close AsyncElasticsearch client
await elastic_client.close()

Expand All @@ -155,12 +174,14 @@ async def main(chunked_data: Iterator[tuple[JsonBlob, ...]]) -> None:
FILENAME = args["filename"]
CHUNKSIZE = args["chunksize"]

# Specify an alias to index the data under
INDEX_ALIAS = get_settings().elastic_index_alias
assert INDEX_ALIAS

data = list(get_json_data(DATA_DIR, FILENAME))
if LIMIT > 0:
data = data[:LIMIT]

chunked_data = chunk_iterable(data, CHUNKSIZE)

# Run main async event loop
asyncio.run(main(chunked_data))
print("Finished execution!")
if data:
asyncio.run(main(data, INDEX_ALIAS))
2 changes: 1 addition & 1 deletion dbs/meilisearch/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
class Settings(BaseSettings):
meili_service: str
meili_master_key: str
meili_port: str
meili_port: int
meili_url: str
tag: str

Expand Down
2 changes: 1 addition & 1 deletion dbs/meilisearch/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: "3"
version: "3.9"

services:
meilisearch:
Expand Down
3 changes: 1 addition & 2 deletions dbs/meilisearch/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
meilisearch-python-async==1.2.0
pydantic>=1.10.7, <2.0.0
pydantic[dotenv]>=1.10.7, <2.0.0
fastapi>=0.95.0, <1.0.0
httpx>=0.24.0
aiohttp>=3.8.4
uvicorn>=0.21.0, <1.0.0
python-dotenv>=1.0.0
srsly>=2.4.6
36 changes: 16 additions & 20 deletions dbs/meilisearch/scripts/bulk_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import argparse
import asyncio
import json
import os
import sys
from concurrent.futures import ProcessPoolExecutor
Expand Down Expand Up @@ -70,14 +69,13 @@ def validate(
return validated_data


def process_file(data: list[JsonBlob]) -> tuple[list[JsonBlob], str]:
def process_chunks(data: list[JsonBlob]) -> tuple[list[JsonBlob], str]:
validated_data = validate(data, Wine, exclude_none=True)
return validated_data


def get_meili_settings(filename: str) -> MeilisearchSettings:
with open(filename, "r") as f:
settings = json.load(f)
settings = dict(srsly.read_json(filename))
# Convert to MeilisearchSettings pydantic model object
settings = MeilisearchSettings(**settings)
return settings
Expand All @@ -89,12 +87,10 @@ def get_meili_settings(filename: str) -> MeilisearchSettings:
async def update_documents_to_index(index: Index, primary_key: str, data: list[JsonBlob]) -> None:
ids = [item[primary_key] for item in data]
await index.update_documents(data, primary_key)
print(f"Indexed {len(ids)} ids in range {min(ids)}-{max(ids)} to db")
print(f"Processed ids in range {min(ids)}-{max(ids)}")


async def main(
chunked_data: Iterator[tuple[JsonBlob, ...]], meili_settings: MeilisearchSettings
) -> None:
async def main(data: list[JsonBlob], meili_settings: MeilisearchSettings) -> None:
settings = Settings()
URI = f"http://{settings.meili_url}:{settings.meili_port}"
MASTER_KEY = settings.meili_master_key
Expand All @@ -107,17 +103,20 @@ async def main(
await client.index(index_name).update_settings(meili_settings)
print("Finished updating database index settings")

print("Processing files")
with ProcessPoolExecutor() as process_pool:
# Attach process pool to running event loop so that we can process multiple files in parallel
# Process multiple chunks of data in a process pool to avoid blocking the event loop
print("Processing chunks")
chunked_data = chunk_iterable(data, CHUNKSIZE)

with ProcessPoolExecutor() as pool:
loop = asyncio.get_running_loop()
calls = [partial(process_file, chunk) for chunk in chunked_data]
call_coroutines = [loop.run_in_executor(process_pool, call) for call in calls]
# Gather and run document update coroutines
coroutines = await asyncio.gather(*call_coroutines)
tasks = [update_documents_to_index(index, primary_key, data) for data in coroutines]
executor_tasks = [partial(process_chunks, chunk) for chunk in chunked_data]
awaitables = [loop.run_in_executor(pool, call) for call in executor_tasks]
# Attach process pool to running event loop so that we can process multiple chunks in parallel
validated_data = await asyncio.gather(*awaitables)
tasks = [update_documents_to_index(index, primary_key, data) for data in validated_data]
try:
await asyncio.gather(*tasks)
print("Finished execution!")
except Exception as e:
print(f"{e}: Error while indexing to db")

Expand All @@ -140,11 +139,8 @@ async def main(
if LIMIT > 0:
data = data[:LIMIT]

chunked_data = chunk_iterable(data, CHUNKSIZE)

meili_settings = get_meili_settings(filename="settings/settings.json")

# Run main async event loop
if data:
asyncio.run(main(chunked_data, meili_settings))
print(f"Finished indexing {len(data)} JSONL files to db")
asyncio.run(main(data, meili_settings))
4 changes: 3 additions & 1 deletion dbs/neo4j/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@


class Settings(BaseSettings):
db_service: str
neo4j_service: str
neo4j_url: str
neo4j_user: str
neo4j_password: str
tag: str

Expand Down
6 changes: 3 additions & 3 deletions dbs/neo4j/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ def get_settings():
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Async context manager for MongoDB connection."""
settings = get_settings()
service = settings.db_service
service = settings.neo4j_service
URI = f"bolt://{service}:7687"
AUTH = ("neo4j", settings.neo4j_password)
AUTH = (settings.neo4j_user, settings.neo4j_password)
async with AsyncGraphDatabase.driver(URI, auth=AUTH) as driver:
async with driver.session(database="neo4j") as session:
app.session = session
print("Successfully connected to wine reviews Neo4j DB")
yield
print("Successfully closed wine reviews Neo4j connection")
print("Successfully closed wine reviews Neo4j connection")


app = FastAPI(
Expand Down
6 changes: 3 additions & 3 deletions dbs/neo4j/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
version: '3.9'

services:
db:
neo4j:
container_name: neo4j_wine
image: neo4j:${NEO4J_VERSION}
restart: unless-stopped
Expand Down Expand Up @@ -32,7 +32,7 @@ services:
ports:
- ${API_PORT}:8000
depends_on:
- db
- neo4j
volumes:
- ./:/wine
networks:
Expand Down
5 changes: 2 additions & 3 deletions dbs/neo4j/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
neo4j>=5.7.0
pydantic>=1.10.7, <2.0.0
neo4j>=5.8.0
pydantic[dotenv]>=1.10.7, <2.0.0
fastapi>=0.95.0, <1.0.0
httpx>=0.24.0
aiohttp>=3.8.4
uvloop>=0.17.0
uvicorn>=0.21.0, <1.0.0
python-dotenv>=1.0.0
srsly>=2.4.6
Loading

0 comments on commit 2314010

Please sign in to comment.