Skip to content

Commit

Permalink
refactor: separate out parquet to kazu docs conversion
Browse files Browse the repository at this point in the history
* now annotate_with_llm takes kazu docs as an input
* deals with duplicate documents rather than just blindly overwriting them
  • Loading branch information
paluchasz committed Nov 15, 2024
1 parent 1836f1d commit 4855298
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 146 deletions.
18 changes: 13 additions & 5 deletions kazu/steps/ner/llm_ner.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import json
import logging
from enum import auto
from typing import Iterable, Optional, Any, cast, Protocol
from typing import Any, Iterable, Optional, Protocol, cast

import ahocorasick
import vertexai
from kazu.data import Document, Entity, Section, MentionConfidence, AutoNameEnum
from vertexai.generative_models import SafetySetting
from vertexai.generative_models._generative_models import (
GenerativeModel,
SafetySettingsType,
)

from kazu.data import AutoNameEnum, Document, Entity, MentionConfidence, Section
from kazu.steps import Step, document_iterating_step
from kazu.utils.spacy_pipeline import BASIC_PIPELINE_NAME, SpacyPipelines, basic_spacy_pipeline
from kazu.utils.spacy_pipeline import (
BASIC_PIPELINE_NAME,
SpacyPipelines,
basic_spacy_pipeline,
)
from kazu.utils.utils import word_is_valid
from vertexai.generative_models import SafetySetting
from vertexai.generative_models._generative_models import SafetySettingsType, GenerativeModel

logger = logging.getLogger(__name__)

Expand Down
158 changes: 23 additions & 135 deletions scripts/examples/annotate_with_llm.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
import logging
import math
import random
from pathlib import Path
from typing import List, Iterable, Protocol, Optional, Any, cast
from typing import Any, Iterable, List, Optional, cast

import hydra
import ray
from hydra.utils import instantiate
from omegaconf import DictConfig
import ray
from ray import ObjectRef
from ray.util.queue import Queue, Empty, Full
from ray.util.queue import Empty, Full, Queue
from tqdm import tqdm
from pyarrow import parquet, Table

from kazu.utils.utils import PathLike, as_path
from kazu.utils.constants import HYDRA_VERSION_BASE
from kazu.data import PROCESSING_EXCEPTION, Document
from kazu.pipeline import Pipeline
from kazu.data import Document, PROCESSING_EXCEPTION, Section

from kazu.utils.constants import HYDRA_VERSION_BASE

logger = logging.getLogger(__name__)

Expand All @@ -34,127 +29,6 @@ def chunks_with_index(lst: List[Any], n: int, index: int) -> List[Any]:
return partitions[index]


class DocumentLoader(Protocol):
"""Abstraction to load documents from a source, and converts them into
:class:`.Document`."""

def load(self) -> Iterable[List[Document]]:
"""Convert documents from a source into :class:`.Document`, and yield a list."""
...

def batch_size(self) -> int:
"""Number of documents produced per batch."""
...

def total_documents(self) -> Optional[int]:
"""Total Documents in this data source, if known."""
...

def total_batches_if_available(self) -> Optional[int]:
maybe_total = self.total_documents()
if maybe_total is not None:
total = int(maybe_total / self.batch_size())
else:
total = None
return total


class ParquetDocumentLoader(DocumentLoader):
def __init__(
self,
batch_size: int,
source_dir: PathLike,
randomise_processing_order: bool = True,
):
"""
:param batch_size: number of documents to produce per batch
:param source_dir: Path to parquet dataset. This should have three columns:
id: a globally unique id for the document, ids: a dict or list of any other ids
associated with the document, sections: an array of structs with the
fields:
{section:<type(str) name of section>,
text:<type(str) the text to process>,
subSection:<type(str) optional additional string of section information>}
:param randomise_processing_order: should parquet files be processed in a random order?
"""
self.randomise_processing_order = randomise_processing_order
self._batch_size = batch_size
self.logger = logging.getLogger(__name__)
self.source_dir = as_path(source_dir)
self.files_to_process = self._get_file_list()
self.logger.info(f"{len(self.files_to_process)} file to do this batch run")

def batch_size(self) -> int:
return self._batch_size

def _list_files_in_dir(self, dir: Path) -> List[Path]:
paths = []
for path in dir.iterdir():
# ignore any non-parquet files
if path.suffix == ".parquet":
paths.append(path)
return sorted(paths)

def _get_file_list(self) -> List[Path]:
self.logger.info(f"selecting all files from {self.source_dir}")
todo_as_paths = self._list_files_in_dir(self.source_dir)
if self.randomise_processing_order:
random.shuffle(todo_as_paths)
else:
todo_as_paths.sort()
return todo_as_paths

def _table_slice_to_docs(self, table: Table) -> List[Document]:
docs = []

for as_dict in table.select(["id", "sections"]).to_pylist():
sections = as_dict["sections"]
idx = as_dict["id"]
kazu_sections = []
for section in sections:
kazu_sections.append(
Section(
name=section["section"],
text=section["text"],
metadata={"subSection": section.get("subSection")},
)
)
docs.append(Document(idx=idx, sections=kazu_sections))
return docs

def load(self) -> Iterable[List[Document]]:
for target_file in self.files_to_process:
for docs in self._subpartition_parquet(target_file):
yield docs

def total_documents(self) -> int:
table = parquet.read_table(self.source_dir, columns=["id"])
return cast(int, table.shape[0])

def _subpartition_parquet(self, file_path: Path) -> Iterable[List[Document]]:
table = parquet.read_table(file_path)
if table.shape[0] == 0:
self.logger.debug(
f"no rows detected/required in file {file_path}. Nothing to partition"
)
else:
partition_count = math.ceil(table.shape[0] / self.batch_size())
self.logger.info(
f"dataframe will yield {partition_count} partitions ({table.shape[0]} rows)"
)
offset_index = 0
while True:
slice = table.slice(offset_index, self.batch_size())
if slice.shape[0] == 0:
self.logger.info(f"no more slices for {file_path}")
break
else:
docs = self._table_slice_to_docs(slice)
yield docs
offset_index = offset_index + self.batch_size()


class RayPipelineQueueWorker:
"""Reads Documents from a queue, processes with a pipeline and writes to another
queue."""
Expand Down Expand Up @@ -208,7 +82,8 @@ def __init__(
self.cfg = cfg
self.worker_count = cfg.worker_count
ray.init(num_cpus=self.worker_count)
self.loader: ParquetDocumentLoader = instantiate(cfg.ParquetDocumentLoader)
self.batch_size = cfg.batch_size
self.source_dir = Path(cfg.source_dir)
self.out_dir = Path(cfg.out_dir)
self.out_dir.mkdir(exist_ok=True)
self.failed_dir = Path(cfg.failed_dir)
Expand Down Expand Up @@ -249,8 +124,8 @@ def start(self) -> Iterable[List[Document]]:
docs_wanted = 0
responses = 0
log_count = 0
for i, docs in enumerate(
tqdm(self.loader.load(), smoothing=0.1, total=self.loader.total_batches_if_available())
for _, docs in enumerate(
tqdm(self.load_documents_in_batches(), smoothing=0.1, total=len(self.get_docs_paths))
):
docs_wanted += len(docs)
unprocessed = self.check_doc_not_already_processed(docs)
Expand All @@ -271,13 +146,26 @@ def start(self) -> Iterable[List[Document]]:
responses += len(result)
yield result

@property
def get_docs_paths(self) -> List[Path]:
return list(self.source_dir.glob("*.json"))

def load_documents_in_batches(self) -> Iterable[List[Document]]:
for i in range(0, len(self.get_docs_paths), self.batch_size):
paths_batch = self.get_docs_paths[i : i + self.batch_size]
doc_batch = []
for doc_path in paths_batch:
with doc_path.open(mode="r") as f:
doc_batch.append(Document.from_json(f.read()))
yield doc_batch


@hydra.main(version_base=HYDRA_VERSION_BASE, config_path="conf", config_name="config")
def run_pipe(cfg: DictConfig) -> None:
job = RayBatchRunner(cfg.annotate_with_llm)
success_count = 0
fail_count = 0
for docs in tqdm(job.start(), total=job.loader.total_batches_if_available()):
for docs in tqdm(job.start(), total=len(job.get_docs_paths)):
for doc in docs:
out_path = job.out_dir.joinpath(f"{doc.idx}.json")
failed_path = job.failed_dir.joinpath(f"{doc.idx}.json")
Expand Down
8 changes: 3 additions & 5 deletions scripts/examples/conf/annotate_with_llm/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pipeline:
- entity_match
- entity_class
- top_level_category
location: ???
location: us-central1
- _target_: kazu.steps.other.cleanup.CleanupStep
cleanup_actions:
- _target_: kazu.steps.other.cleanup.EntityFilterCleanupAction
Expand Down Expand Up @@ -188,10 +188,8 @@ pipeline:
min_len: 2
failure_handler:
- _target_: kazu.pipeline.FailedDocsLogHandler
batch_size: 4
worker_count: 2
ParquetDocumentLoader:
_target_: __main__.ParquetDocumentLoader
source_dir: ???
batch_size: 1
source_dir: ???
out_dir: ???
failed_dir: ???
3 changes: 2 additions & 1 deletion scripts/examples/conf/config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
defaults:
- multilabel_ner_training: default
- annotate_with_llm: default
- _self_ # see https://hydra.cc/docs/1.2/upgrades/1.0_to_1.1/default_composition_order/
- convert_parquet_to_kazu_docs: default
- _self_ # see https://hydra.cc/docs/1.2/upgrades/1.0_to_1.1/default_composition_order/

# we set certain env vars here for things that are statically initialised
hydra:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
_convert_: "all"
ParquetDocumentLoader:
_target_: __main__.ParquetDocumentLoader
source_dir: /Users/krmr097/code/KAZU/training/raw/percentile=1
batch_size: 1
out_dir: /Users/krmr097/code/KAZU/training/kazu_docs
Loading

0 comments on commit 4855298

Please sign in to comment.