diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 66a7742f..838abff0 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -100,7 +100,7 @@ jobs: if: steps.cache-venv.outputs.cache-hit != 'true' uses: actions/setup-python@v4 with: - python-version: "3.8" + python-version: "3.9" architecture: "x64" - name: Create a new Python environment & install maturin diff --git a/Cargo.lock b/Cargo.lock index a17e9105..1457f246 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -987,7 +987,7 @@ dependencies = [ [[package]] name = "dolma" -version = "1.0.9" +version = "1.0.14" dependencies = [ "adblock", "ahash", diff --git a/Cargo.toml b/Cargo.toml index 44963812..5d6fdadc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "dolma" -version = "1.0.9" +version = "1.1.0" edition = "2021" license = "Apache-2.0" diff --git a/pyproject.toml b/pyproject.toml index e7cc8cdc..dc89ab78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,10 +1,10 @@ [project] name = "dolma" -version = "1.0.14.post1" +version = "1.1.0" description = "Data filters" license = { text = "Apache-2.0" } readme = "README.md" -requires-python = ">=3.8" +requires-python = ">=3.9" dependencies = [ "anyascii>=0.3.2", "blingfire==0.1.8", @@ -196,12 +196,12 @@ exclude = ''' | tests/work ) ''' -target-version = ["py38", "py39", "py310", "py311", "py312"] +target-version = ["py39", "py310", "py311", "py312"] [tool.isort] profile = "black" -py_version = 38 +py_version = 39 known_first_party = ["dolma"] known_local_folder = ["tests", "python"] extend_skip_glob = [ @@ -222,7 +222,7 @@ recursive = true aggressive = 3 [tool.mypy] -python_version = "3.8" +python_version = "3.9" ignore_missing_imports = true no_site_packages = true allow_redefinition = false diff --git a/python/dolma/cli/deduper.py b/python/dolma/cli/deduper.py index c4a60a66..de6a43d5 100644 --- a/python/dolma/cli/deduper.py +++ b/python/dolma/cli/deduper.py @@ -1,3 +1,5 @@ +import fnmatch +import os from contextlib import ExitStack from dataclasses import dataclass from pathlib import Path @@ -99,6 +101,13 @@ class DedupeConfig: partition_index: Optional[int] = field( default=0, help="The index of the partition being processed, in the range [0, num_partitions)." ) + file_partition: Optional[bool] = field( + default=False, help="Whether or not to partition at the document level (vs at the span level)" + ) + document_dir: Optional[str] = field( + default="documents", + help="The folder in source paths to replace with 'attributes' to store results, if not 'documents'", + ) @dataclass @@ -135,7 +144,6 @@ def run(cls, parsed_config: DeduperConfig): logger = get_logger("tagger") dict_config: Dict[str, Any] = {} - with ExitStack() as stack: work_dirs = stack.enter_context(make_workdirs(parsed_config.work_dir)) @@ -146,6 +154,8 @@ def run(cls, parsed_config: DeduperConfig): "min_words": parsed_config.dedupe.min_words, "num_partitions": parsed_config.dedupe.num_partitions, "partition_index": parsed_config.dedupe.partition_index, + "file_partition": parsed_config.dedupe.file_partition, + "document_dir": parsed_config.dedupe.document_dir, } try_name = parsed_config.dedupe.name if not om.is_missing(parsed_config.dedupe, "name") else None @@ -182,7 +192,17 @@ def run(cls, parsed_config: DeduperConfig): # perform some path validation to make sure we don't call the mixer with invalid config total_matching_documents = 0 for document in parsed_config.documents: - dict_config.setdefault("documents", []).append(str(document)) + + if not any( + fnmatch.fnmatch(dict_config["dedupe"]["document_dir"], part) for part in document.split(os.sep) + ): + raise DolmaConfigError( + f"Path ({document}) does not contain expected document directory: '/{dict_config['dedupe']['document_dir']}/'. " + ) + + doc = str(document) + + dict_config.setdefault("documents", []).append(doc) current_matching_documents = sum(1 for _ in glob_path(document)) if current_matching_documents == 0: diff --git a/python/dolma/warc/processor.py b/python/dolma/warc/processor.py index c59f6f51..474c6ca9 100644 --- a/python/dolma/warc/processor.py +++ b/python/dolma/warc/processor.py @@ -134,9 +134,10 @@ def process_single( extension = extension.replace(".gz", "").replace(".warc", "") + ".jsonl.gz" destination_path = join_path(prot, *base_dst[:-1], base_dst[-1] + extension) - with smart_open.open(source_path, "rb") as warc_file, smart_open.open( - destination_path, "wb" - ) as output_file: + with ( + smart_open.open(source_path, "rb") as warc_file, + smart_open.open(destination_path, "wb") as output_file, + ): it = ArchiveIterator(warc_file, record_types=WarcRecordType.response | WarcRecordType.warcinfo) for record in it: if record.record_type == WarcRecordType.warcinfo: diff --git a/src/deduper.rs b/src/deduper.rs index f2ad99b1..9aef26bf 100644 --- a/src/deduper.rs +++ b/src/deduper.rs @@ -14,8 +14,9 @@ use crate::s3_util; use crate::shard::shard_config::{CompressionConfig, WorkDirConfig}; use crate::shard::{find_objects_matching_patterns, FileCache}; use crate::wimbd::tokens::tokenize; - +use ahash::RandomState; use deduper_config::*; +use std::hash::{BuildHasher, Hash, Hasher}; pub fn run(config: DeduperConfig) -> Result { let bloom_filter = BloomFilter::initialize(&config.bloom_filter).unwrap(); @@ -33,7 +34,20 @@ pub fn run(config: DeduperConfig) -> Result { let threadpool = ThreadPool::new(config.processes); let failed_shard_count = AtomicU32::new(0); let failed_shard_count_ref = Arc::new(failed_shard_count); + let hash_builder = RandomState::with_seeds(0, 1, 2, 3); + for p in paths { + let mut hasher = hash_builder.build_hasher(); + p.hash(&mut hasher); + let hashed_path = hasher.finish(); + + if config.dedupe.file_partition.unwrap_or(false) + && hashed_path % config.dedupe.num_partitions.unwrap_or(1) + != config.dedupe.partition_index.unwrap_or(0) + { + continue; + } + let path = p.clone(); let work_dirs = config.work_dir.clone(); let dedupe = config.dedupe.clone(); @@ -121,10 +135,24 @@ fn write_attributes( ); } + let document_key = dedupe_config + .document_dir + .unwrap_or(String::from("documents")); + let attrs_location = { let attr_prefix = format!("/attributes/{}/", attr_key); - docs_location.replace("/documents/", &attr_prefix) + docs_location.replace(&format!("/{}/", &document_key), &attr_prefix) }; + + if attrs_location == docs_location { + log::error!( + "{} does not contain {} . Not writing its attributes!", + docs_location, + &document_key + ); + panic!("Attribute would be written to document location!"); + } + let local_output = cache.prepare_output(&attrs_location, label_temp)?; let mut num_processed = 0; let mut num_observed = 0; @@ -546,6 +574,8 @@ pub mod deduper_config { pub skip_empty: Option, pub num_partitions: Option, pub partition_index: Option, + pub file_partition: Option, + pub document_dir: Option, } #[derive(Serialize, Deserialize, Clone)] diff --git a/tests/config/filepath-bad.json b/tests/config/filepath-bad.json new file mode 100644 index 00000000..a08ef950 --- /dev/null +++ b/tests/config/filepath-bad.json @@ -0,0 +1,28 @@ +{ + "documents": [ + "tests/data/provided/deduper/pathnotd0cumentz/000.json.gz" + ], + "work_dir": { + "input": "tests/work/temp/dedupe-para/input", + "output": "tests/work/temp/dedupe-para/output" + }, + "dedupe": { + "name": "dedupe_paragraph_ngrams", + "paragraphs": { + "attribute_name": "bff_duplicate_paragraph_spans", + "by_ngram": { + "ngram_length": 6, + "stride": 3, + "overlap_threshold": 0.5 + } + } + }, + "bloom_filter": { + "file": "tests/work/para_bloom_filter.bin", + "size_in_bytes": 0, + "read_only": false, + "estimated_doc_count": 1000, + "desired_false_positive_rate": 0.001 + }, + "processes": 1 +} diff --git a/tests/config/filepath-good.json b/tests/config/filepath-good.json new file mode 100644 index 00000000..b2ee06e0 --- /dev/null +++ b/tests/config/filepath-good.json @@ -0,0 +1,29 @@ +{ + "documents": [ + "tests/data/provided/deduper/pathnotd0cumentz/000.json.gz" + ], + "work_dir": { + "input": "tests/work/temp/dedupe-para/input", + "output": "tests/work/temp/dedupe-para/output" + }, + "dedupe": { + "name": "dedupe_paragraph_ngrams", + "document_dir": "pathnotd0cumentz", + "paragraphs": { + "attribute_name": "bff_duplicate_paragraph_spans", + "by_ngram": { + "ngram_length": 6, + "stride": 3, + "overlap_threshold": 0.5 + } + } + }, + "bloom_filter": { + "file": "tests/work/para_bloom_filter.bin", + "size_in_bytes": 0, + "read_only": false, + "estimated_doc_count": 1000, + "desired_false_positive_rate": 0.001 + }, + "processes": 1 +} diff --git a/tests/data/provided/deduper/pathnotd0cumentz/000.json.gz b/tests/data/provided/deduper/pathnotd0cumentz/000.json.gz new file mode 100644 index 00000000..c1adc645 Binary files /dev/null and b/tests/data/provided/deduper/pathnotd0cumentz/000.json.gz differ diff --git a/tests/python/test_deduper.py b/tests/python/test_deduper.py index 05a3081f..214c8732 100644 --- a/tests/python/test_deduper.py +++ b/tests/python/test_deduper.py @@ -10,6 +10,7 @@ from typing_extensions import TypedDict from dolma.cli.__main__ import main +from dolma.core.errors import DolmaConfigError from dolma.core.utils import split_words from .utils import ( @@ -24,6 +25,9 @@ TEST_DIR = Path(__file__).parent.parent DEDUPE_BY_URL = TEST_DIR / "config/dedupe-by-url.json" +DEDUPE_BAD_FILENAME = TEST_DIR / "config/filepath-bad.json" +DEDUPE_GOOD_FILENAME = TEST_DIR / "config/filepath-good.json" + DEDUPE_PARAGRAPHS = TEST_DIR / "config/dedupe-paragraphs.json" DEDUPE_PARAGRAPH_NGRAMS = TEST_DIR / "config/dedupe-paragraph-ngrams.json" @@ -48,13 +52,13 @@ def setUp(self) -> None: # upload test data upload_s3_prefix( - s3_prefix=f"{self.remote_test_prefix}", local_prefix="tests/data/provided/deduper/documents/*.gz" + s3_prefix=f"{self.remote_test_prefix}", local_prefix="tests/data/provided/deduper/*/*.gz" ) # copy provided config files to local temp dir shutil.copytree( - "tests/data/provided/deduper/documents", - f"{self.local_temp_dir}/tests/data/provided/deduper/documents", + "tests/data/provided/deduper", + f"{self.local_temp_dir}/tests/data/provided/deduper", dirs_exist_ok=True, ) @@ -82,6 +86,33 @@ def test_dedupe_by_url(self): ) return self._compare_dedupe_output(expected, computed) # pyright: ignore + def test_dedupe_bad_filepath(self): + with open(DEDUPE_BAD_FILENAME, "r") as f: + config = json.load(f) + + config["documents"][0] = f'{self.local_temp_dir}/{config["documents"][0]}' + config["bloom_filter"]["file"] = f'{self.local_temp_dir}/{config["bloom_filter"]["file"]}' + + with NamedTemporaryFile("w") as f: + json.dump(config, f) + f.flush() + + with self.assertRaises(DolmaConfigError): + main(argv=["-c", f.name, "dedupe"]) + + def test_dedupe_good_filepath(self): + with open(DEDUPE_GOOD_FILENAME, "r") as f: + config = json.load(f) + + config["documents"][0] = f'{self.local_temp_dir}/{config["documents"][0]}' + config["bloom_filter"]["file"] = f'{self.local_temp_dir}/{config["bloom_filter"]["file"]}' + + with NamedTemporaryFile("w") as f: + json.dump(config, f) + f.flush() + + main(argv=["-c", f.name, "dedupe"]) + def test_dedupe_paragraphs(self): with open(DEDUPE_PARAGRAPHS, "r") as f: config = json.load(f)