Skip to content

Commit

Permalink
switched to skip_existing
Browse files Browse the repository at this point in the history
  • Loading branch information
soldni committed Jan 8, 2025
1 parent 6cccca7 commit 271654d
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 22 deletions.
2 changes: 1 addition & 1 deletion docs/taggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The following parameters are supported either via CLI (e.g. `dolma tag --paramet
|`taggers`|Yes| One or more taggers to run. |
|`tagger_modules`|No| List of one or more Python modules to load taggers from. See section [*"Using Custom Taggers"*](#using-custom-taggers) for more details. |
|`processes`|No| Number of processes to use for tagging. One process is used by default. |
|`ignore_existing`|No| If true, ignore existing outputs and re-run the taggers. |
|`skip_existing`|No| If true, ignore existing outputs and re-run the taggers. |
|`dryrun`|No| If true, only print the configuration and exit without running the taggers. |
|`debug`|No| If true, run in debug mode (i.e., disable parallelism). Useful when developing new taggers. |
|`profile.enable`|No| If true, enable profiling. Useful when benchmarking taggers during development. |
Expand Down
4 changes: 2 additions & 2 deletions python/dolma/cli/tagger.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class TaggerConfig:
default=1,
help="Number of parallel processes to use.",
)
ignore_existing: bool = field(
skip_existing: bool = field(
default=False,
help="Whether to ignore existing outputs and re-run the taggers.",
)
Expand Down Expand Up @@ -132,7 +132,7 @@ def run(cls, parsed_config: TaggerConfig):
metadata=work_dirs.output,
taggers=taggers,
taggers_modules=parsed_config.tagger_modules,
ignore_existing=parsed_config.ignore_existing,
skip_existing=parsed_config.skip_existing,
num_processes=parsed_config.processes,
experiment=parsed_config.experiment,
debug=parsed_config.debug,
Expand Down
4 changes: 2 additions & 2 deletions python/dolma/cli/warc.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class WarcExtractorConfig:
default=1,
help="Number of parallel processes to use.",
)
ignore_existing: bool = field(
skip_existing: bool = field(
default=False,
help="Whether to ignore existing outputs and re-run the taggers.",
)
Expand Down Expand Up @@ -107,7 +107,7 @@ def run(cls, parsed_config: WarcExtractorConfig):
destination=(destination[0] if len(destination) == 1 else destination),
metadata=work_dirs.output,
num_processes=parsed_config.processes,
ignore_existing=parsed_config.ignore_existing,
skip_existing=parsed_config.skip_existing,
debug=parsed_config.debug,
source_name=source_name,
pre_taggers=parsed_config.pre.taggers,
Expand Down
2 changes: 1 addition & 1 deletion python/dolma/core/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def create_and_run_analyzer(
metadata_prefix=metadata_path,
debug=debug,
seed=seed,
ignore_existing=True,
skip_existing=True,
retries_on_error=0,
num_processes=num_processes,
)
Expand Down
12 changes: 6 additions & 6 deletions python/dolma/core/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __init__(
debug: bool = False,
seed: int = 0,
pbar_timeout: float = 1e-3,
ignore_existing: bool = False,
skip_existing: bool = False,
include_paths: Optional[List[str]] = None,
exclude_paths: Optional[List[str]] = None,
files_regex_pattern: Optional[str] = None,
Expand All @@ -87,15 +87,15 @@ def __init__(
file names will also be the same.
metadata_prefix (str): The prefix of the metadata files to save. This can be a local path or an
S3 path. Metadata output will be created for each file after it is processed. Filenames are
checked to verify if a file has been processed and can be skipped unless `ignore_existing` is
checked to verify if a file has been processed and can be skipped unless `skip_existing` is
set to true.
num_processes (int, optional): The number of processes to use. Defaults to 1.
debug (bool, optional): Whether to run in debug mode; if true, no multiprocessing will be used.
Defaults to False.
seed (int, optional): The random seed to use when shuffling input files. Defaults to 0.
pbar_timeout (float, optional): How often to update progress bars in seconds.
Defaults to 0.01 seconds.
ignore_existing (bool, optional): Whether to ignore files that have been already processed and
skip_existing (bool, optional): Whether to ignore files that have been already processed and
re-run the processor on all files from scratch. Defaults to False.
include_paths (Optional[List[str]], optional): A list of paths to include. If provided, only files
that match one of the paths will be processed. Defaults to None.
Expand All @@ -118,7 +118,7 @@ def __init__(
self.debug = debug
self.seed = seed
self.pbar_timeout = pbar_timeout
self.ignore_existing = ignore_existing
self.skip_existing = skip_existing

self.include_paths = set(include_paths) if include_paths is not None else None
self.exclude_paths = set(exclude_paths) if exclude_paths is not None else None
Expand Down Expand Up @@ -354,7 +354,7 @@ def __add__(self: BPP, other: BPP) -> BPP:
debug=self.debug or other.debug,
seed=self.seed,
pbar_timeout=max(self.pbar_timeout, other.pbar_timeout),
ignore_existing=self.ignore_existing or other.ignore_existing,
skip_existing=self.skip_existing or other.skip_existing,
include_paths=include_paths,
exclude_paths=exclude_paths,
files_regex_pattern=regex_pattern,
Expand Down Expand Up @@ -484,7 +484,7 @@ def _get_all_paths(self) -> AllPathsTuple:
)

for path in rel_paths:
if not self.ignore_existing and path in existing_metadata_names:
if not self.skip_existing and path in existing_metadata_names:
continue

if not self._valid_path(path):
Expand Down
10 changes: 5 additions & 5 deletions python/dolma/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ def process_single(
# total number of documents processed
total_docs_cnt = 0

if not kwargs.get("ignore_existing", False):
if kwargs.get("skip_existing", False):
# we group taggers by their path (this is for cases when two taggers are going to same file)
# and then remove all taggers if any of the paths exists and ignore_existing is True
# and then remove all taggers if any of the paths exists and skip_existing is True
_taggers_by_path: Dict[str, list[str]] = {}
for tagger_name, tagger_location in taggers_paths.items():
_taggers_by_path.setdefault(tagger_location.path, []).append(tagger_name)
Expand Down Expand Up @@ -419,7 +419,7 @@ def create_and_run_tagger(
metadata: Union[None, str, List[str]] = None,
debug: bool = False,
seed: int = 0,
ignore_existing: bool = False,
skip_existing: bool = False,
skip_on_failure: bool = False,
retries_on_error: int = 0,
num_processes: int = 1,
Expand Down Expand Up @@ -447,7 +447,7 @@ def create_and_run_tagger(
which documents have been processed. If `None`, the metadata will be saved in a temporary directory.
debug (bool, optional): Whether to run in debug mode. Defaults to False.
seed (int, optional): The seed to use for the random number generator. Defaults to 0.
ignore_existing (bool, optional): Whether to ignore existing outputs and re-run the taggers.
skip_existing (bool, optional): Whether to ignore existing outputs and re-run the taggers.
Defaults to False.
skip_on_failure (bool, optional): Whether to skip a document if it fails to process. Defaults to False.
retries_on_error (int, optional): Number of times to retry processing a document if it fails.
Expand Down Expand Up @@ -502,7 +502,7 @@ def create_and_run_tagger(
metadata_prefix=metadata,
debug=debug or profile_enable, # if profile is true, debug must be true
seed=seed,
ignore_existing=ignore_existing,
skip_existing=skip_existing,
retries_on_error=retries_on_error,
num_processes=num_processes,
)
Expand Down
4 changes: 2 additions & 2 deletions python/dolma/warc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def create_and_run_warc_pipeline(
metadata: Union[None, str, List[str]] = None,
debug: bool = False,
seed: int = 0,
ignore_existing: bool = False,
skip_existing: bool = False,
skip_on_failure: bool = False,
retries_on_error: int = 0,
num_processes: int = 1,
Expand Down Expand Up @@ -291,7 +291,7 @@ def create_and_run_warc_pipeline(
metadata_prefix=all_meta_paths,
debug=debug,
seed=seed,
ignore_existing=ignore_existing,
skip_existing=skip_existing,
retries_on_error=retries_on_error,
num_processes=num_processes,
)
Expand Down
4 changes: 2 additions & 2 deletions tests/python/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_base_parallel_processor(self):
source_prefix=str(LOCAL_DATA / "expected"),
destination_prefix=f"{d}/destination",
metadata_prefix=f"{d}/metadata",
ignore_existing=False,
skip_existing=False,
)
proc()
src = [p for p in os.listdir(LOCAL_DATA / "expected") if not p.startswith(".")]
Expand All @@ -55,7 +55,7 @@ def test_base_parallel_processor(self):
source_prefix=str(LOCAL_DATA / "expected" / "*-paragraphs.*"),
destination_prefix=f"{d}/destination",
metadata_prefix=f"{d}/metadata",
ignore_existing=False,
skip_existing=False,
)
proc()
src = [p for p in os.listdir(LOCAL_DATA / "expected") if "paragraphs" in p]
Expand Down
2 changes: 1 addition & 1 deletion tests/python/test_warc.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def _run_pipeline(self, html: bool = False, pretag: bool = False) -> Dict[str, L
documents=[f"{DATA_PATH}/*.warc.gz"],
destination=[self.tempdir],
num_processes=1,
ignore_existing=False,
skip_existing=False,
debug=True,
source_name="test",
skip_no_pre_taggers=pretag,
Expand Down

0 comments on commit 271654d

Please sign in to comment.