Skip to content

Commit

Permalink
Update indexing script
Browse files Browse the repository at this point in the history
  • Loading branch information
prrao87 committed Oct 23, 2023
1 parent 778e80f commit 0e0884a
Showing 1 changed file with 17 additions and 26 deletions.
43 changes: 17 additions & 26 deletions dbs/elasticsearch/scripts/bulk_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import sys
import warnings
from concurrent.futures import ProcessPoolExecutor
from functools import lru_cache, partial
from pathlib import Path
from typing import Any, Iterator
Expand Down Expand Up @@ -60,7 +59,7 @@ def validate(
data: tuple[JsonBlob],
exclude_none: bool = False,
) -> list[JsonBlob]:
validated_data = [Wine(**item).dict(exclude_none=exclude_none) for item in data]
validated_data = [Wine(**item).model_dump(exclude_none=exclude_none) for item in data]
return validated_data


Expand Down Expand Up @@ -95,7 +94,8 @@ async def create_index(client: AsyncElasticsearch, index: str, mappings_path: Pa
elastic_config = dict(srsly.read_json(mappings_path))
assert elastic_config is not None

if not client.indices.exists_alias(name=index):
exists_alias = await client.indices.exists_alias(name=index)
if not exists_alias:
print(f"Did not find index {index} in db, creating index...\n")
with warnings.catch_warnings():
warnings.simplefilter("ignore")
Expand Down Expand Up @@ -129,31 +129,22 @@ async def update_documents_to_index(
print(f"Processed ids in range {min(ids)}-{max(ids)}")


async def main(data: list[JsonBlob], index: str) -> None:
async def main(data: list[JsonBlob]) -> None:
settings = get_settings()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
elastic_client = await get_elastic_client(settings)
await create_index(elastic_client, index, Path("mapping/mapping.json"))

# Process multiple chunks of data in a process pool to avoid blocking the event loop
print("Processing chunks")
chunked_data = chunk_iterable(data, CHUNKSIZE)

with ProcessPoolExecutor() as pool:
loop = asyncio.get_running_loop()
executor_tasks = [partial(process_chunks, chunk) for chunk in chunked_data]
awaitables = [loop.run_in_executor(pool, call) for call in executor_tasks]
# Attach process pool to running event loop so that we can process multiple chunks in parallel
validated_data = await asyncio.gather(*awaitables)
tasks = [
update_documents_to_index(elastic_client, index, data) for data in validated_data
]
try:
await asyncio.gather(*tasks)
print("Finished execution!")
except Exception as e:
print(f"{e}: Error while indexing to db")
assert await elastic_client.ping()
await create_index(elastic_client, INDEX_ALIAS, Path("mapping/mapping.json"))
# Validate data and chunk it for ingesting in batches
validated_data = validate(data, exclude_none=False)
chunked_data = chunk_iterable(validated_data, chunksize=CHUNKSIZE)
for chunk in chunked_data:
try:
ids = [item["id"] for item in chunk]
print(f"Finished indexing ID range {min(ids)}-{max(ids)}")
await helpers.async_bulk(elastic_client, chunk, index=INDEX_ALIAS)
except Exception as e:
print(f"{e}: Error while indexing ID range {min(ids)}-{max(ids)}")
# Close AsyncElasticsearch client
await elastic_client.close()

Expand Down Expand Up @@ -182,4 +173,4 @@ async def main(data: list[JsonBlob], index: str) -> None:

# Run main async event loop
if data:
asyncio.run(main(data, INDEX_ALIAS))
asyncio.run(main(data))

0 comments on commit 0e0884a

Please sign in to comment.