From 4855298f2a4fca57297e4bcefd3cfb1839c93166 Mon Sep 17 00:00:00 2001 From: Szymon Palucha Date: Fri, 15 Nov 2024 15:52:32 +0000 Subject: [PATCH] refactor: separate out parquet to kazu docs conversion * now annotate_with_llm takes kazu docs as an input * deals with duplicate documents rather than just blindly overwriting them --- kazu/steps/ner/llm_ner.py | 18 +- scripts/examples/annotate_with_llm.py | 158 +++------------- .../conf/annotate_with_llm/default.yaml | 8 +- scripts/examples/conf/config.yaml | 3 +- .../convert_parquet_to_kazu_docs/default.yaml | 6 + .../examples/convert_parquet_to_kazu_docs.py | 179 ++++++++++++++++++ 6 files changed, 226 insertions(+), 146 deletions(-) create mode 100644 scripts/examples/conf/convert_parquet_to_kazu_docs/default.yaml create mode 100644 scripts/examples/convert_parquet_to_kazu_docs.py diff --git a/kazu/steps/ner/llm_ner.py b/kazu/steps/ner/llm_ner.py index 7152c595..8ed85b98 100644 --- a/kazu/steps/ner/llm_ner.py +++ b/kazu/steps/ner/llm_ner.py @@ -1,16 +1,24 @@ import json import logging from enum import auto -from typing import Iterable, Optional, Any, cast, Protocol +from typing import Any, Iterable, Optional, Protocol, cast import ahocorasick import vertexai -from kazu.data import Document, Entity, Section, MentionConfidence, AutoNameEnum +from vertexai.generative_models import SafetySetting +from vertexai.generative_models._generative_models import ( + GenerativeModel, + SafetySettingsType, +) + +from kazu.data import AutoNameEnum, Document, Entity, MentionConfidence, Section from kazu.steps import Step, document_iterating_step -from kazu.utils.spacy_pipeline import BASIC_PIPELINE_NAME, SpacyPipelines, basic_spacy_pipeline +from kazu.utils.spacy_pipeline import ( + BASIC_PIPELINE_NAME, + SpacyPipelines, + basic_spacy_pipeline, +) from kazu.utils.utils import word_is_valid -from vertexai.generative_models import SafetySetting -from vertexai.generative_models._generative_models import SafetySettingsType, GenerativeModel logger = logging.getLogger(__name__) diff --git a/scripts/examples/annotate_with_llm.py b/scripts/examples/annotate_with_llm.py index 331bb6c2..94c1ac5f 100644 --- a/scripts/examples/annotate_with_llm.py +++ b/scripts/examples/annotate_with_llm.py @@ -1,23 +1,18 @@ import logging -import math -import random from pathlib import Path -from typing import List, Iterable, Protocol, Optional, Any, cast +from typing import Any, Iterable, List, Optional, cast import hydra +import ray from hydra.utils import instantiate from omegaconf import DictConfig -import ray from ray import ObjectRef -from ray.util.queue import Queue, Empty, Full +from ray.util.queue import Empty, Full, Queue from tqdm import tqdm -from pyarrow import parquet, Table -from kazu.utils.utils import PathLike, as_path -from kazu.utils.constants import HYDRA_VERSION_BASE +from kazu.data import PROCESSING_EXCEPTION, Document from kazu.pipeline import Pipeline -from kazu.data import Document, PROCESSING_EXCEPTION, Section - +from kazu.utils.constants import HYDRA_VERSION_BASE logger = logging.getLogger(__name__) @@ -34,127 +29,6 @@ def chunks_with_index(lst: List[Any], n: int, index: int) -> List[Any]: return partitions[index] -class DocumentLoader(Protocol): - """Abstraction to load documents from a source, and converts them into - :class:`.Document`.""" - - def load(self) -> Iterable[List[Document]]: - """Convert documents from a source into :class:`.Document`, and yield a list.""" - ... - - def batch_size(self) -> int: - """Number of documents produced per batch.""" - ... - - def total_documents(self) -> Optional[int]: - """Total Documents in this data source, if known.""" - ... - - def total_batches_if_available(self) -> Optional[int]: - maybe_total = self.total_documents() - if maybe_total is not None: - total = int(maybe_total / self.batch_size()) - else: - total = None - return total - - -class ParquetDocumentLoader(DocumentLoader): - def __init__( - self, - batch_size: int, - source_dir: PathLike, - randomise_processing_order: bool = True, - ): - """ - - :param batch_size: number of documents to produce per batch - :param source_dir: Path to parquet dataset. This should have three columns: - id: a globally unique id for the document, ids: a dict or list of any other ids - associated with the document, sections: an array of structs with the - fields: - {section:, - text:, - subSection:} - :param randomise_processing_order: should parquet files be processed in a random order? - """ - self.randomise_processing_order = randomise_processing_order - self._batch_size = batch_size - self.logger = logging.getLogger(__name__) - self.source_dir = as_path(source_dir) - self.files_to_process = self._get_file_list() - self.logger.info(f"{len(self.files_to_process)} file to do this batch run") - - def batch_size(self) -> int: - return self._batch_size - - def _list_files_in_dir(self, dir: Path) -> List[Path]: - paths = [] - for path in dir.iterdir(): - # ignore any non-parquet files - if path.suffix == ".parquet": - paths.append(path) - return sorted(paths) - - def _get_file_list(self) -> List[Path]: - self.logger.info(f"selecting all files from {self.source_dir}") - todo_as_paths = self._list_files_in_dir(self.source_dir) - if self.randomise_processing_order: - random.shuffle(todo_as_paths) - else: - todo_as_paths.sort() - return todo_as_paths - - def _table_slice_to_docs(self, table: Table) -> List[Document]: - docs = [] - - for as_dict in table.select(["id", "sections"]).to_pylist(): - sections = as_dict["sections"] - idx = as_dict["id"] - kazu_sections = [] - for section in sections: - kazu_sections.append( - Section( - name=section["section"], - text=section["text"], - metadata={"subSection": section.get("subSection")}, - ) - ) - docs.append(Document(idx=idx, sections=kazu_sections)) - return docs - - def load(self) -> Iterable[List[Document]]: - for target_file in self.files_to_process: - for docs in self._subpartition_parquet(target_file): - yield docs - - def total_documents(self) -> int: - table = parquet.read_table(self.source_dir, columns=["id"]) - return cast(int, table.shape[0]) - - def _subpartition_parquet(self, file_path: Path) -> Iterable[List[Document]]: - table = parquet.read_table(file_path) - if table.shape[0] == 0: - self.logger.debug( - f"no rows detected/required in file {file_path}. Nothing to partition" - ) - else: - partition_count = math.ceil(table.shape[0] / self.batch_size()) - self.logger.info( - f"dataframe will yield {partition_count} partitions ({table.shape[0]} rows)" - ) - offset_index = 0 - while True: - slice = table.slice(offset_index, self.batch_size()) - if slice.shape[0] == 0: - self.logger.info(f"no more slices for {file_path}") - break - else: - docs = self._table_slice_to_docs(slice) - yield docs - offset_index = offset_index + self.batch_size() - - class RayPipelineQueueWorker: """Reads Documents from a queue, processes with a pipeline and writes to another queue.""" @@ -208,7 +82,8 @@ def __init__( self.cfg = cfg self.worker_count = cfg.worker_count ray.init(num_cpus=self.worker_count) - self.loader: ParquetDocumentLoader = instantiate(cfg.ParquetDocumentLoader) + self.batch_size = cfg.batch_size + self.source_dir = Path(cfg.source_dir) self.out_dir = Path(cfg.out_dir) self.out_dir.mkdir(exist_ok=True) self.failed_dir = Path(cfg.failed_dir) @@ -249,8 +124,8 @@ def start(self) -> Iterable[List[Document]]: docs_wanted = 0 responses = 0 log_count = 0 - for i, docs in enumerate( - tqdm(self.loader.load(), smoothing=0.1, total=self.loader.total_batches_if_available()) + for _, docs in enumerate( + tqdm(self.load_documents_in_batches(), smoothing=0.1, total=len(self.get_docs_paths)) ): docs_wanted += len(docs) unprocessed = self.check_doc_not_already_processed(docs) @@ -271,13 +146,26 @@ def start(self) -> Iterable[List[Document]]: responses += len(result) yield result + @property + def get_docs_paths(self) -> List[Path]: + return list(self.source_dir.glob("*.json")) + + def load_documents_in_batches(self) -> Iterable[List[Document]]: + for i in range(0, len(self.get_docs_paths), self.batch_size): + paths_batch = self.get_docs_paths[i : i + self.batch_size] + doc_batch = [] + for doc_path in paths_batch: + with doc_path.open(mode="r") as f: + doc_batch.append(Document.from_json(f.read())) + yield doc_batch + @hydra.main(version_base=HYDRA_VERSION_BASE, config_path="conf", config_name="config") def run_pipe(cfg: DictConfig) -> None: job = RayBatchRunner(cfg.annotate_with_llm) success_count = 0 fail_count = 0 - for docs in tqdm(job.start(), total=job.loader.total_batches_if_available()): + for docs in tqdm(job.start(), total=len(job.get_docs_paths)): for doc in docs: out_path = job.out_dir.joinpath(f"{doc.idx}.json") failed_path = job.failed_dir.joinpath(f"{doc.idx}.json") diff --git a/scripts/examples/conf/annotate_with_llm/default.yaml b/scripts/examples/conf/annotate_with_llm/default.yaml index 87da832a..aeb950c3 100644 --- a/scripts/examples/conf/annotate_with_llm/default.yaml +++ b/scripts/examples/conf/annotate_with_llm/default.yaml @@ -59,7 +59,7 @@ pipeline: - entity_match - entity_class - top_level_category - location: ??? + location: us-central1 - _target_: kazu.steps.other.cleanup.CleanupStep cleanup_actions: - _target_: kazu.steps.other.cleanup.EntityFilterCleanupAction @@ -188,10 +188,8 @@ pipeline: min_len: 2 failure_handler: - _target_: kazu.pipeline.FailedDocsLogHandler +batch_size: 4 worker_count: 2 -ParquetDocumentLoader: - _target_: __main__.ParquetDocumentLoader - source_dir: ??? - batch_size: 1 +source_dir: ??? out_dir: ??? failed_dir: ??? diff --git a/scripts/examples/conf/config.yaml b/scripts/examples/conf/config.yaml index f3119fcd..43a96d2c 100644 --- a/scripts/examples/conf/config.yaml +++ b/scripts/examples/conf/config.yaml @@ -1,7 +1,8 @@ defaults: - multilabel_ner_training: default - annotate_with_llm: default - - _self_ # see https://hydra.cc/docs/1.2/upgrades/1.0_to_1.1/default_composition_order/ + - convert_parquet_to_kazu_docs: default + - _self_ # see https://hydra.cc/docs/1.2/upgrades/1.0_to_1.1/default_composition_order/ # we set certain env vars here for things that are statically initialised hydra: diff --git a/scripts/examples/conf/convert_parquet_to_kazu_docs/default.yaml b/scripts/examples/conf/convert_parquet_to_kazu_docs/default.yaml new file mode 100644 index 00000000..898874cb --- /dev/null +++ b/scripts/examples/conf/convert_parquet_to_kazu_docs/default.yaml @@ -0,0 +1,6 @@ +_convert_: "all" +ParquetDocumentLoader: + _target_: __main__.ParquetDocumentLoader + source_dir: /Users/krmr097/code/KAZU/training/raw/percentile=1 + batch_size: 1 +out_dir: /Users/krmr097/code/KAZU/training/kazu_docs diff --git a/scripts/examples/convert_parquet_to_kazu_docs.py b/scripts/examples/convert_parquet_to_kazu_docs.py new file mode 100644 index 00000000..ff687215 --- /dev/null +++ b/scripts/examples/convert_parquet_to_kazu_docs.py @@ -0,0 +1,179 @@ +import logging +import math +import random +from collections import defaultdict +from pathlib import Path +from typing import Iterable, List, Optional, Protocol, cast + +import hydra +import tqdm +from hydra.utils import instantiate +from omegaconf import DictConfig +from pyarrow import Table, parquet + +from kazu.data import Document, Section +from kazu.utils.constants import HYDRA_VERSION_BASE +from kazu.utils.utils import PathLike, as_path + + +class DocumentLoader(Protocol): + """Abstraction to load documents from a source, and converts them into + :class:`.Document`.""" + + def load(self) -> Iterable[List[Document]]: + """Convert documents from a source into :class:`.Document`, and yield a list.""" + ... + + def batch_size(self) -> int: + """Number of documents produced per batch.""" + ... + + def total_documents(self) -> Optional[int]: + """Total Documents in this data source, if known.""" + ... + + def total_batches_if_available(self) -> Optional[int]: + maybe_total = self.total_documents() + if maybe_total is not None: + total = int(maybe_total / self.batch_size()) + else: + total = None + return total + + +class ParquetDocumentLoader(DocumentLoader): + def __init__( + self, + batch_size: int, + source_dir: PathLike, + randomise_processing_order: bool = True, + ): + """ + + :param batch_size: number of documents to produce per batch + :param source_dir: Path to parquet dataset. This should have three columns: + id: a globally unique id for the document, ids: a dict or list of any other ids + associated with the document, sections: an array of structs with the + fields: + {section:, + text:, + subSection:} + :param randomise_processing_order: should parquet files be processed in a random order? + """ + self.randomise_processing_order = randomise_processing_order + self._batch_size = batch_size + self.logger = logging.getLogger(__name__) + self.source_dir = as_path(source_dir) + self.files_to_process = self._get_file_list() + self.logger.info(f"{len(self.files_to_process)} file to do this batch run") + + def batch_size(self) -> int: + return self._batch_size + + def _list_files_in_dir(self, dir: Path) -> List[Path]: + paths = [] + for path in dir.iterdir(): + # ignore any non-parquet files + if path.suffix == ".parquet": + paths.append(path) + return sorted(paths) + + def _get_file_list(self) -> List[Path]: + self.logger.info(f"selecting all files from {self.source_dir}") + todo_as_paths = self._list_files_in_dir(self.source_dir) + if self.randomise_processing_order: + random.shuffle(todo_as_paths) + else: + todo_as_paths.sort() + return todo_as_paths + + def _table_slice_to_docs(self, table: Table) -> List[Document]: + docs = [] + + for as_dict in table.select(["id", "sections"]).to_pylist(): + sections = as_dict["sections"] + idx = as_dict["id"] + kazu_sections = [] + for section in sections: + kazu_sections.append( + Section( + name=section["section"], + text=section["text"], + metadata={"subSection": section.get("subSection")}, + ) + ) + docs.append(Document(idx=idx, sections=kazu_sections)) + return docs + + def load(self) -> Iterable[List[Document]]: + for target_file in self.files_to_process: + for docs in self._subpartition_parquet(target_file): + yield docs + + def total_documents(self) -> int: + table = parquet.read_table(self.source_dir, columns=["id"]) + return cast(int, table.shape[0]) + + def _subpartition_parquet(self, file_path: Path) -> Iterable[List[Document]]: + table = parquet.read_table(file_path) + if table.shape[0] == 0: + self.logger.debug( + f"no rows detected/required in file {file_path}. Nothing to partition" + ) + else: + partition_count = math.ceil(table.shape[0] / self.batch_size()) + self.logger.info( + f"dataframe will yield {partition_count} partitions ({table.shape[0]} rows)" + ) + offset_index = 0 + while True: + slice = table.slice(offset_index, self.batch_size()) + if slice.shape[0] == 0: + self.logger.info(f"no more slices for {file_path}") + break + else: + docs = self._table_slice_to_docs(slice) + yield docs + offset_index = offset_index + self.batch_size() + + +def merge_documents(docs: set[Document]) -> Document: + # Assuming these docs are near duplicates, we can keep the one with the most sections for now + return max(docs, key=lambda x: len(x.sections)) + + +@hydra.main(version_base=HYDRA_VERSION_BASE, config_path="conf", config_name="config") +def main(cfg: DictConfig) -> None: + document_loader: ParquetDocumentLoader = instantiate( + cfg.convert_parquet_to_kazu_docs.ParquetDocumentLoader + ) + output_dir = Path(cfg.convert_parquet_to_kazu_docs.out_dir) + output_dir.mkdir(exist_ok=True) + + docs_by_idx = defaultdict(set) + for docs in tqdm.tqdm( + document_loader.load(), total=document_loader.total_batches_if_available() + ): + for doc in docs: + docs_by_idx[doc.idx].add(doc) + + loaded_docs = [] + for idx, duplicate_docs in docs_by_idx.items(): + if len(duplicate_docs) == 1: + loaded_docs.append(duplicate_docs.pop()) + else: + print( + f"Found {len(duplicate_docs)} duplicates for {idx}. Need to merge them to avoid duplicates." + ) + merged_doc = merge_documents(duplicate_docs) + loaded_docs.append(merged_doc) + + print(f"After dropping duplicates left with {len(loaded_docs)} documents to save.") + for doc in loaded_docs: + file_path = output_dir / f"{doc.idx}.json" + with file_path.open("w") as f: + f.write(doc.to_json()) + + +if __name__ == "__main__": + main()