Skip to content

Commit

Permalink
Merge pull request #19 from prrao87/srsly
Browse files Browse the repository at this point in the history
Use srsly for JSON serialization
  • Loading branch information
prrao87 authored Apr 21, 2023
2 parents 23529c7 + 680cfe7 commit def1585
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 331 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,6 @@ dmypy.json
.DS_Store

# data
*.jsonl
data/*.json
data/*.jsonl
*/*/meili_data
58 changes: 19 additions & 39 deletions data/convert.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,26 @@
import json
import zipfile
from typing import Any, Iterator
"""
Run `pip install srsly` to use this script
This script converts the JSON data file from https://www.kaggle.com/datasets/zynicide/wine-reviews
to a .gzip line-delimited (.jsonl) file for use downstream with the databases in question.
Full credit to the original author, @zynicide, on Kaggle, for the data.
"""
from pathlib import Path
from typing import Any

import srsly

JsonBlob = dict[str, Any]


def read_data(filename: str) -> list[JsonBlob]:
with open(filename) as f:
data = json.load(f)
for idx, item in enumerate(data, 1):
item["id"] = idx
return data


def chunk_iterable(item_list: list[JsonBlob], chunksize: int) -> Iterator[tuple[JsonBlob, ...]]:
"""
Break a large iterable into an iterable of smaller iterables of size `chunksize`
"""
for i in range(0, len(item_list), chunksize):
yield tuple(item_list[i : i + chunksize])


def write_chunked_data(item_list: list[JsonBlob], output_name: str, chunksize: int = 5000) -> None:
"""
Write data to a zip file in chunks so that we don't dump all data into a single huge JSON file
"""
zipfilename = f"{output_name}-jsonl.zip"
with zipfile.ZipFile(
zipfilename,
"w",
compression=zipfile.ZIP_DEFLATED,
compresslevel=5,
) as zipf:
chunked_data = chunk_iterable(item_list, chunksize)
for num, chunk in enumerate(chunked_data, 1):
filename = f"{output_name}-{num}.jsonl"
chunk_json = "\n".join(json.dumps(item) for item in chunk)
# Write the JSONL data into the specified filename *inside* the ZIP file
zipf.writestr(filename, data=chunk_json)
def convert_to_jsonl(filename: str) -> None:
data = srsly.read_json(filename)
# Add an `id` field to the start of each dict item so we have a primary key for indexing
new_data = [{"id": idx, **item} for idx, item in enumerate(data, 1)]
srsly.write_gzip_jsonl(f"{Path(filename).stem}.jsonl.gz", new_data)


if __name__ == "__main__":
# Download the JSON data file from https://www.kaggle.com/datasets/zynicide/wine-reviews
data = read_data("winemag-data-130k-v2.json")
write_chunked_data(data, "winemag-data-130k-v2")
# Download the JSON data file from https://www.kaggle.com/datasets/zynicide/wine-reviews'
convert_to_jsonl("winemag-data-130k-v2.json")
Binary file not shown.
3 changes: 2 additions & 1 deletion dbs/elasticsearch/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ fastapi>=0.95.0, <1.0.0
httpx>=0.24.0
aiohttp>=3.8.4
uvicorn>=0.21.0, <1.0.0
python-dotenv>=1.0.0
python-dotenv>=1.0.0
srsly>=2.4.6
120 changes: 43 additions & 77 deletions dbs/elasticsearch/scripts/bulk_index.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import argparse
import asyncio
import glob
import json
import os
import sys
import warnings
import zipfile
from pathlib import Path
from typing import Any
from typing import Any, Iterator

import srsly
from dotenv import load_dotenv
from elasticsearch import AsyncElasticsearch, helpers
from pydantic.main import ModelMetaclass
Expand All @@ -28,45 +27,30 @@ class FileNotFoundError(Exception):
# --- Blocking functions ---


def get_json_files(file_prefix: str, file_path: Path) -> list[str]:
"""Get all line-delimited json files (.jsonl) from a directory with a given prefix"""
files = sorted(glob.glob(f"{file_path}/{file_prefix}*.jsonl"))
if not files:
raise FileNotFoundError(
f"No .jsonl files with prefix `{file_prefix}` found in `{file_path}`"
)
return files


def clean_directory(dirname: Path) -> None:
"""Clean up existing files to avoid conflicts"""
if Path(dirname).exists():
for f in Path(dirname).glob("*"):
if f.is_file():
f.unlink()


def extract_json_from_zip(data_path: Path, file_path: Path) -> None:
def chunk_iterable(item_list: list[JsonBlob], chunksize: int) -> Iterator[tuple[JsonBlob, ...]]:
"""
Extract .jsonl files from zip file and save them in `file_path`
Break a large iterable into an iterable of smaller iterables of size `chunksize`
"""
clean_directory(file_path)
zipfiles = sorted(glob.glob(f"{str(data_path)}/*.zip"))
for file in zipfiles:
with zipfile.ZipFile(file, "r") as zipf:
for fn in zipf.infolist():
fn.filename = Path(fn.filename).name
zipf.extract(fn, file_path)


def read_jsonl_from_file(filename: str) -> list[JsonBlob]:
with open(filename, "r") as f:
data = [json.loads(line.strip()) for line in f.readlines()]
for i in range(0, len(item_list), chunksize):
yield tuple(item_list[i : i + chunksize])


def get_json_data(data_dir: Path, filename: str) -> list[JsonBlob]:
"""Get all line-delimited json files (.jsonl) from a directory with a given prefix"""
file_path = data_dir / filename
if not file_path.is_file():
# File may not have been uncompressed yet so try to do that first
data = srsly.read_gzip_jsonl(file_path)
# This time if it isn't there it really doesn't exist
if not file_path.is_file():
raise FileNotFoundError(f"No valid .jsonl file found in `{data_dir}`")
else:
data = srsly.read_gzip_jsonl(file_path)
return data


def validate(
data: list[JsonBlob],
data: tuple[JsonBlob],
model: ModelMetaclass,
exclude_none: bool = False,
) -> list[JsonBlob]:
Expand Down Expand Up @@ -95,7 +79,7 @@ async def get_elastic_client() -> AsyncElasticsearch:
return elastic_client


async def create_index(mappings_path: Path, client: AsyncElasticsearch) -> None:
async def create_index(client: AsyncElasticsearch, mappings_path: Path) -> None:
"""Create an index associated with an alias in ElasticSearch"""
with open(mappings_path, "rb") as f:
config = json.load(f)
Expand Down Expand Up @@ -135,66 +119,48 @@ async def bulk_index_wines_to_elastic(
print(f"A document failed to index: {response}")


async def main(files: list[str]) -> None:
async def main(chunked_data: Iterator[tuple[JsonBlob, ...]]) -> None:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
elastic_client = await get_elastic_client()
INDEX_ALIAS = os.environ.get("ELASTIC_INDEX_ALIAS")
if not elastic_client.indices.exists_alias(name=INDEX_ALIAS):
print(f"Did not find index {INDEX_ALIAS} in db, creating index...\n")
await create_index(Path("mapping/mapping.json"), elastic_client)
for file in files:
data = read_jsonl_from_file(file)
validated_data = validate(data, Wine)
await create_index(elastic_client, Path("mapping/mapping.json"))
counter = 0
for chunk in chunked_data:
validated_data = validate(chunk, Wine)
counter += len(validated_data)
ids = [item["id"] for item in validated_data]
try:
await bulk_index_wines_to_elastic(elastic_client, INDEX_ALIAS, validated_data)
print(f"Indexed {Path(file).name} to db")
print(f"Indexed {counter} items")
except Exception as e:
print(f"{e}: Failed to index {Path(file).name} to db")
print(f"{e}: Failed to index items in the ID range {min(ids)}-{max(ids)} to db")
# Close AsyncElasticsearch client
await elastic_client.close()
print("Finished execution")


if __name__ == "__main__":
# fmt: off
parser = argparse.ArgumentParser("Bulk index database from the wine reviews JSONL data")
parser.add_argument(
"--limit",
type=int,
default=0,
help="Limit the size of the dataset to load for testing purposes",
)
parser.add_argument(
"--refresh",
action="store_true",
help="Refresh zip file data by clearing existing directory & extracting them again",
)
parser.add_argument(
"--filename",
type=str,
default="winemag-data-130k-v2-jsonl.zip",
help="Name of the JSONL zip file to use",
)
parser.add_argument("--limit", type=int, default=0, help="Limit the size of the dataset to load for testing purposes")
parser.add_argument("--chunksize", type=int, default=10_000, help="Size of each chunk to break the dataset into before processing")
parser.add_argument("--filename", type=str, default="winemag-data-130k-v2.jsonl.gz", help="Name of the JSONL zip file to use")
args = vars(parser.parse_args())
# fmt: on

LIMIT = args["limit"]
DATA_DIR = Path(__file__).parents[3] / "data"
ZIPFILE = DATA_DIR / args["filename"]
# Get file path for unzipped files
filename = Path(args["filename"]).stem
FILE_PATH = DATA_DIR / filename

# Extract JSONL files from zip files if `--refresh` flag is set
if args["refresh"]:
# Extract all json files from zip files from their parent directory and save them in `parent_dir/data`.
extract_json_from_zip(DATA_DIR, FILE_PATH)

files = get_json_files("winemag-data", FILE_PATH)
assert files, f"No files found in {FILE_PATH}"
FILENAME = args["filename"]
CHUNKSIZE = args["chunksize"]

data = list(get_json_data(DATA_DIR, FILENAME))
if LIMIT > 0:
files = files[:LIMIT]
data = data[:LIMIT]

chunked_data = chunk_iterable(data, CHUNKSIZE)

# Run main async event loop
if files:
asyncio.run(main(files))
asyncio.run(main(chunked_data))
print("Finished execution!")
3 changes: 2 additions & 1 deletion dbs/meilisearch/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ fastapi>=0.95.0, <1.0.0
httpx>=0.24.0
aiohttp>=3.8.4
uvicorn>=0.21.0, <1.0.0
python-dotenv>=1.0.0
python-dotenv>=1.0.0
srsly>=2.4.6
Loading

0 comments on commit def1585

Please sign in to comment.