From 271654d4dcfcf9c3bc30282b353771f560498dd0 Mon Sep 17 00:00:00 2001 From: Luca Soldaini Date: Wed, 8 Jan 2025 10:40:56 -0800 Subject: [PATCH] switched to skip_existing --- docs/taggers.md | 2 +- python/dolma/cli/tagger.py | 4 ++-- python/dolma/cli/warc.py | 4 ++-- python/dolma/core/analyzer.py | 2 +- python/dolma/core/parallel.py | 12 ++++++------ python/dolma/core/runtime.py | 10 +++++----- python/dolma/warc/processor.py | 4 ++-- tests/python/test_parallel.py | 4 ++-- tests/python/test_warc.py | 2 +- 9 files changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/taggers.md b/docs/taggers.md index f5e6ddbd..a6d39da3 100644 --- a/docs/taggers.md +++ b/docs/taggers.md @@ -26,7 +26,7 @@ The following parameters are supported either via CLI (e.g. `dolma tag --paramet |`taggers`|Yes| One or more taggers to run. | |`tagger_modules`|No| List of one or more Python modules to load taggers from. See section [*"Using Custom Taggers"*](#using-custom-taggers) for more details. | |`processes`|No| Number of processes to use for tagging. One process is used by default. | -|`ignore_existing`|No| If true, ignore existing outputs and re-run the taggers. | +|`skip_existing`|No| If true, ignore existing outputs and re-run the taggers. | |`dryrun`|No| If true, only print the configuration and exit without running the taggers. | |`debug`|No| If true, run in debug mode (i.e., disable parallelism). Useful when developing new taggers. | |`profile.enable`|No| If true, enable profiling. Useful when benchmarking taggers during development. | diff --git a/python/dolma/cli/tagger.py b/python/dolma/cli/tagger.py index 9982ec05..93af43ca 100644 --- a/python/dolma/cli/tagger.py +++ b/python/dolma/cli/tagger.py @@ -74,7 +74,7 @@ class TaggerConfig: default=1, help="Number of parallel processes to use.", ) - ignore_existing: bool = field( + skip_existing: bool = field( default=False, help="Whether to ignore existing outputs and re-run the taggers.", ) @@ -132,7 +132,7 @@ def run(cls, parsed_config: TaggerConfig): metadata=work_dirs.output, taggers=taggers, taggers_modules=parsed_config.tagger_modules, - ignore_existing=parsed_config.ignore_existing, + skip_existing=parsed_config.skip_existing, num_processes=parsed_config.processes, experiment=parsed_config.experiment, debug=parsed_config.debug, diff --git a/python/dolma/cli/warc.py b/python/dolma/cli/warc.py index 9a8da2ea..ebed4ea6 100644 --- a/python/dolma/cli/warc.py +++ b/python/dolma/cli/warc.py @@ -39,7 +39,7 @@ class WarcExtractorConfig: default=1, help="Number of parallel processes to use.", ) - ignore_existing: bool = field( + skip_existing: bool = field( default=False, help="Whether to ignore existing outputs and re-run the taggers.", ) @@ -107,7 +107,7 @@ def run(cls, parsed_config: WarcExtractorConfig): destination=(destination[0] if len(destination) == 1 else destination), metadata=work_dirs.output, num_processes=parsed_config.processes, - ignore_existing=parsed_config.ignore_existing, + skip_existing=parsed_config.skip_existing, debug=parsed_config.debug, source_name=source_name, pre_taggers=parsed_config.pre.taggers, diff --git a/python/dolma/core/analyzer.py b/python/dolma/core/analyzer.py index c8d542c0..09a0616b 100644 --- a/python/dolma/core/analyzer.py +++ b/python/dolma/core/analyzer.py @@ -324,7 +324,7 @@ def create_and_run_analyzer( metadata_prefix=metadata_path, debug=debug, seed=seed, - ignore_existing=True, + skip_existing=True, retries_on_error=0, num_processes=num_processes, ) diff --git a/python/dolma/core/parallel.py b/python/dolma/core/parallel.py index 0bbfc75f..79af6ddf 100644 --- a/python/dolma/core/parallel.py +++ b/python/dolma/core/parallel.py @@ -69,7 +69,7 @@ def __init__( debug: bool = False, seed: int = 0, pbar_timeout: float = 1e-3, - ignore_existing: bool = False, + skip_existing: bool = False, include_paths: Optional[List[str]] = None, exclude_paths: Optional[List[str]] = None, files_regex_pattern: Optional[str] = None, @@ -87,7 +87,7 @@ def __init__( file names will also be the same. metadata_prefix (str): The prefix of the metadata files to save. This can be a local path or an S3 path. Metadata output will be created for each file after it is processed. Filenames are - checked to verify if a file has been processed and can be skipped unless `ignore_existing` is + checked to verify if a file has been processed and can be skipped unless `skip_existing` is set to true. num_processes (int, optional): The number of processes to use. Defaults to 1. debug (bool, optional): Whether to run in debug mode; if true, no multiprocessing will be used. @@ -95,7 +95,7 @@ def __init__( seed (int, optional): The random seed to use when shuffling input files. Defaults to 0. pbar_timeout (float, optional): How often to update progress bars in seconds. Defaults to 0.01 seconds. - ignore_existing (bool, optional): Whether to ignore files that have been already processed and + skip_existing (bool, optional): Whether to ignore files that have been already processed and re-run the processor on all files from scratch. Defaults to False. include_paths (Optional[List[str]], optional): A list of paths to include. If provided, only files that match one of the paths will be processed. Defaults to None. @@ -118,7 +118,7 @@ def __init__( self.debug = debug self.seed = seed self.pbar_timeout = pbar_timeout - self.ignore_existing = ignore_existing + self.skip_existing = skip_existing self.include_paths = set(include_paths) if include_paths is not None else None self.exclude_paths = set(exclude_paths) if exclude_paths is not None else None @@ -354,7 +354,7 @@ def __add__(self: BPP, other: BPP) -> BPP: debug=self.debug or other.debug, seed=self.seed, pbar_timeout=max(self.pbar_timeout, other.pbar_timeout), - ignore_existing=self.ignore_existing or other.ignore_existing, + skip_existing=self.skip_existing or other.skip_existing, include_paths=include_paths, exclude_paths=exclude_paths, files_regex_pattern=regex_pattern, @@ -484,7 +484,7 @@ def _get_all_paths(self) -> AllPathsTuple: ) for path in rel_paths: - if not self.ignore_existing and path in existing_metadata_names: + if not self.skip_existing and path in existing_metadata_names: continue if not self._valid_path(path): diff --git a/python/dolma/core/runtime.py b/python/dolma/core/runtime.py index dc751c54..b563ea9d 100644 --- a/python/dolma/core/runtime.py +++ b/python/dolma/core/runtime.py @@ -298,9 +298,9 @@ def process_single( # total number of documents processed total_docs_cnt = 0 - if not kwargs.get("ignore_existing", False): + if kwargs.get("skip_existing", False): # we group taggers by their path (this is for cases when two taggers are going to same file) - # and then remove all taggers if any of the paths exists and ignore_existing is True + # and then remove all taggers if any of the paths exists and skip_existing is True _taggers_by_path: Dict[str, list[str]] = {} for tagger_name, tagger_location in taggers_paths.items(): _taggers_by_path.setdefault(tagger_location.path, []).append(tagger_name) @@ -419,7 +419,7 @@ def create_and_run_tagger( metadata: Union[None, str, List[str]] = None, debug: bool = False, seed: int = 0, - ignore_existing: bool = False, + skip_existing: bool = False, skip_on_failure: bool = False, retries_on_error: int = 0, num_processes: int = 1, @@ -447,7 +447,7 @@ def create_and_run_tagger( which documents have been processed. If `None`, the metadata will be saved in a temporary directory. debug (bool, optional): Whether to run in debug mode. Defaults to False. seed (int, optional): The seed to use for the random number generator. Defaults to 0. - ignore_existing (bool, optional): Whether to ignore existing outputs and re-run the taggers. + skip_existing (bool, optional): Whether to ignore existing outputs and re-run the taggers. Defaults to False. skip_on_failure (bool, optional): Whether to skip a document if it fails to process. Defaults to False. retries_on_error (int, optional): Number of times to retry processing a document if it fails. @@ -502,7 +502,7 @@ def create_and_run_tagger( metadata_prefix=metadata, debug=debug or profile_enable, # if profile is true, debug must be true seed=seed, - ignore_existing=ignore_existing, + skip_existing=skip_existing, retries_on_error=retries_on_error, num_processes=num_processes, ) diff --git a/python/dolma/warc/processor.py b/python/dolma/warc/processor.py index 7d5c9692..71e2adc7 100644 --- a/python/dolma/warc/processor.py +++ b/python/dolma/warc/processor.py @@ -239,7 +239,7 @@ def create_and_run_warc_pipeline( metadata: Union[None, str, List[str]] = None, debug: bool = False, seed: int = 0, - ignore_existing: bool = False, + skip_existing: bool = False, skip_on_failure: bool = False, retries_on_error: int = 0, num_processes: int = 1, @@ -291,7 +291,7 @@ def create_and_run_warc_pipeline( metadata_prefix=all_meta_paths, debug=debug, seed=seed, - ignore_existing=ignore_existing, + skip_existing=skip_existing, retries_on_error=retries_on_error, num_processes=num_processes, ) diff --git a/tests/python/test_parallel.py b/tests/python/test_parallel.py index 1287247a..1fd74244 100644 --- a/tests/python/test_parallel.py +++ b/tests/python/test_parallel.py @@ -41,7 +41,7 @@ def test_base_parallel_processor(self): source_prefix=str(LOCAL_DATA / "expected"), destination_prefix=f"{d}/destination", metadata_prefix=f"{d}/metadata", - ignore_existing=False, + skip_existing=False, ) proc() src = [p for p in os.listdir(LOCAL_DATA / "expected") if not p.startswith(".")] @@ -55,7 +55,7 @@ def test_base_parallel_processor(self): source_prefix=str(LOCAL_DATA / "expected" / "*-paragraphs.*"), destination_prefix=f"{d}/destination", metadata_prefix=f"{d}/metadata", - ignore_existing=False, + skip_existing=False, ) proc() src = [p for p in os.listdir(LOCAL_DATA / "expected") if "paragraphs" in p] diff --git a/tests/python/test_warc.py b/tests/python/test_warc.py index 04f0e9a7..57fd9b45 100644 --- a/tests/python/test_warc.py +++ b/tests/python/test_warc.py @@ -27,7 +27,7 @@ def _run_pipeline(self, html: bool = False, pretag: bool = False) -> Dict[str, L documents=[f"{DATA_PATH}/*.warc.gz"], destination=[self.tempdir], num_processes=1, - ignore_existing=False, + skip_existing=False, debug=True, source_name="test", skip_no_pre_taggers=pretag,