-
Notifications
You must be signed in to change notification settings - Fork 118
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Adding script that validates if mixer config is well formatted and has everything in place * Add S3 path validation with boto3 existence check * Adding check of the files, trying to run jq expressions on them and see if both files and jq expressions are valid * Add S3 path validation, sampling, and doc-attribute alignment checks * adding logic to split jsonpath expressions into pieces and check them * Added JsonPath syntax evaluation, started working on sampling docs and checking their content * Adding logic to check if all doc and corresponding attributes files contain correct fields and same anount of lines * Adding functionality to check if filters in config and attribute files match * updating filter checking logic to focus on filters missing from the mixer config * adding logic to run jq and jsonpath filters on small set of docs to see if they work or fail * refactored to use smart open and added logic to download sample files to a temp folder * added logic to sample lines from doc and apply filters to it, refactored main, added logic to download sample files and work with them locally * Adding clean up logic to delete sample files after the run * adding test configs for mixer validator * addressing comments, spliting script into smaller files, moving test configs to test folder, adding a couple of helpers functions * adding --verbose method, support of .env variables * supporting != operator * updating types in function definitions, updating Readme * adding more error handlers * deleting the initial version of the script --------- Co-authored-by: Masha Iureva <[email protected]>
- Loading branch information
1 parent
153777e
commit 0c0f10c
Showing
15 changed files
with
1,347 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
streams: | ||
- name: cc_tiny_subset | ||
documents: | ||
- s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/documents/**/*.jsonl.gz | ||
|
||
attributes: | ||
- cc_tiny_subset_analysis_october | ||
- bff_duplicate_paragraph_spans_new | ||
|
||
output: | ||
path: s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/mixed-output-exclude-bff | ||
max_size_in_bytes: 4294967296 | ||
discard_fields: | ||
- attributes | ||
|
||
filter: | ||
syntax: jq | ||
include: [] | ||
exclude: | ||
# Language filter (using both cld2 and fasttext) | ||
- (.attributes.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 0.8 else false end) | ||
- (.attributes.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 0.8 else false end) | ||
# Document length filter | ||
- (.attributes.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 100 else false end) | ||
# NSFW content filter | ||
- (.attributes.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.5 else false end) | ||
# Gopher quality filter | ||
- (.attributes.cc_tiny_subset_analysis_october__gopher_v2__word_count | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 50 else false end) | ||
- (.attributes.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.1 else false end) | ||
# Deduplication filter (BFF) | ||
- (.attributes.bff_duplicate_paragraph_spans_new | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] >= 1.0 else false end) | ||
# C4 quality filter | ||
- (.attributes.cc_tiny_subset_analysis_october__c4_v2__line_bullets | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.5 else false end) | ||
- (.attributes.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.3 else false end) | ||
|
||
span_replacement: | ||
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__EMAIL_ADDRESS" | ||
min_score: 0.5 | ||
replacement: " |||EMAIL_ADDRESS||| " | ||
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__PHONE_NUMBER" | ||
min_score: 0.5 | ||
replacement: " |||PHONE_NUMBER||| " | ||
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__IP_ADDRESS" | ||
min_score: 0.5 | ||
replacement: " |||IP_ADDRESS||| " | ||
|
||
work_dir: | ||
input: "/tmp/cc_tiny_subset_mix/input" | ||
output: "/tmp/cc_tiny_subset_mix/output" | ||
|
||
processes: 16 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
streams: | ||
- name: cc_tiny_subset | ||
documents: | ||
- s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/documents/**/*.jsonl.gz | ||
|
||
attributes: | ||
- cc_tiny_subset_analysis_october | ||
- bff_duplicate_paragraph_spans_new | ||
|
||
output: | ||
path: s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/mixed-output | ||
max_size_in_bytes: 4294967296 | ||
discard_fields: | ||
- attributes | ||
|
||
filter: | ||
include: [] | ||
exclude: | ||
# Language filter (using both cld2 and fasttext) | ||
- "$.attributes[?(@.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en && @.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en[0] && @.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en[0][2] < 0.8)]" | ||
- "$.attributes[?(@.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en && @.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en[0] && @.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en[0][2] < 0.8)]" | ||
# Document length filter | ||
- "$.attributes[?(@.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document && @.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document[0] && @.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document[0][2] < 100)]" | ||
# Deduplication filter | ||
- "[email protected][?(@.bff_duplicate_paragraph_spans_new && @.bff_duplicate_paragraph_spans_new[0] && @.bff_duplicate_paragraph_spans_new[0][2] >= 1.0)]" | ||
# NSFW content filter | ||
- "$.attributes[?(@.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw && @.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw[0] && @.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw[0][2] > 0.5)]" | ||
# Gopher quality filter (example, adjust threshold as needed) | ||
- "$.attributes[?(@.cc_tiny_subset_analysis_october__gopher_v2__word_count && @.cc_tiny_subset_analysis_october__gopher_v2__word_count[0] && @.cc_tiny_subset_analysis_october__gopher_v2__word_count[0][2] < 50)]" | ||
- "$.attributes[?(@.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio && @.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio[0] && @.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio[0][2] > 0.1)]" | ||
# C4 quality filter (example, adjust threshold as needed) | ||
- "$.attributes[?(@.cc_tiny_subset_analysis_october__c4_v2__line_bullets && @.cc_tiny_subset_analysis_october__c4_v2__line_bullets[0] && @.cc_tiny_subset_analysis_october__c4_v2__line_bullets[0][2] > 0.5)]" | ||
- "$.attributes[?(@.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat && @.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat[0] && @.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat[0][2] > 0.3)]" | ||
|
||
span_replacement: | ||
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__EMAIL_ADDRESS" | ||
min_score: 0.5 | ||
replacement: " |||EMAIL_ADDRESS||| " | ||
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__PHONE_NUMBER" | ||
min_score: 0.5 | ||
replacement: " |||PHONE_NUMBER||| " | ||
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__IP_ADDRESS" | ||
min_score: 0.5 | ||
replacement: " |||IP_ADDRESS||| " | ||
|
||
work_dir: | ||
input: "/tmp/cc_tiny_subset_mix/input" | ||
output: "/tmp/cc_tiny_subset_mix/output" | ||
|
||
processes: 16 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
# Dolma Mixer Configuration Validator | ||
|
||
This script validates the configuration for the Dolma Mixer, ensuring that all necessary components are correctly set up before running the main process. | ||
|
||
## Features | ||
|
||
The validator performs the following checks: | ||
|
||
1. Verifies the presence and format of required fields in the configuration. | ||
2. Validates the syntax of the configuration file (YAML or JSON). | ||
3. Checks for duplicate keys in the configuration. | ||
4. Validates JQ or JSONPath expressions for syntax and compilation. | ||
5. Verifies S3 path syntax and accessibility. | ||
6. Confirms write permissions for output paths. | ||
7. Checks the existence and accessibility of attribute files. | ||
8. Samples a subset of files for detailed validation. | ||
9. Ensures alignment between document and attribute files. | ||
10. Validates the format and content of sampled files. | ||
11. Executes JQ or JSONPath commands on sampled files. | ||
12. Validates nested key existence in filter expressions. | ||
|
||
## Usage | ||
|
||
Run the validator using the following command: | ||
|
||
``` | ||
python scripts/validate_mixer/main.py <path_to_config_file> [--num_samples <number>] [--verbose] | ||
``` | ||
|
||
- `<path_to_config_file>`: Path to your Dolma Mixer configuration file (required) | ||
- `--num_samples <number>`: (Optional) Number of file samples to validate (default: 1) | ||
- `--verbose`: (Optional) Enable verbose output | ||
|
||
## Output | ||
|
||
The script provides detailed progress information and error messages for any validation failures, helping you troubleshoot configuration issues before running the main Dolma Mixer process. | ||
|
||
## Keyboard Interrupt | ||
|
||
The script handles keyboard interrupts (Ctrl+C) gracefully. | ||
|
||
## Exit Status | ||
|
||
The script will exit with a non-zero status if any validation step fails. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from .config_handler import load_config, validate_config_structure, validate_stream, validate_output, validate_filter_config | ||
from .validator import load_and_validate_config, validate_s3_paths_and_permissions, validate_stream_filters, validate_documents_and_attributes | ||
from .file_operations import sample_files, download_file, sample_and_download_files, count_file_lines, check_attribute_name_typos, sample_file_lines, sample_documents_with_attributes, validate_jsonl, validate_filters_and_check_typos, sample_and_extract_attributes | ||
from .filter_operations import validate_jq_expression, validate_jsonpath_expression, validate_filter_expressions, evaluate_comparison, evaluate_jsonpath_condition, split_complex_jsonpath, prepare_filter, execute_filter_commands, extract_attribute_names_from_filters, extract_filter_attributes | ||
from .s3_utils import validate_s3_path, check_s3_path_exists, check_s3_path_writable, check_s3_parent_exists, list_s3_objects, get_base_path, get_corresponding_attribute_path | ||
from .utils import keyboard_interrupt_handler | ||
from .env_handler import load_env_variables, expand_env_vars |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import yaml | ||
import json | ||
import os | ||
from typing import Dict, Any, List, Union, Type | ||
from env_handler import expand_env_vars_in_config | ||
|
||
def load_config(config_path: str) -> Dict[str, Any]: | ||
"""Load the configuration file (YAML or JSON).""" | ||
if not os.path.exists(config_path): | ||
raise FileNotFoundError(f"Config file not found at path: {config_path}") | ||
try: | ||
with open(config_path, 'r') as file: | ||
if config_path.endswith('.yaml') or config_path.endswith('.yml'): | ||
config = yaml.safe_load(file) | ||
elif config_path.endswith('.json'): | ||
config = json.load(file) | ||
else: | ||
raise ValueError("Unsupported file format. Use .yaml, .yml, or .json") | ||
|
||
config = expand_env_vars_in_config(config) | ||
return config | ||
except Exception as e: | ||
raise ValueError(f"Error loading config file: {str(e)}") | ||
|
||
def validate_config_structure(config: Dict[str, Any]) -> List[str]: | ||
"""Validate the basic structure of the configuration.""" | ||
required_fields = ['streams', 'processes'] | ||
errors = [] | ||
|
||
for field in required_fields: | ||
if field not in config: | ||
errors.append(f"Missing required field: {field}") | ||
elif field == 'streams': | ||
errors.extend(validate_streams(config[field])) | ||
elif field == 'processes': | ||
errors.extend(validate_processes(config[field])) | ||
|
||
return errors | ||
|
||
def validate_streams(streams: Any) -> List[str]: | ||
errors = [] | ||
if not isinstance(streams, list): | ||
errors.append("'streams' should be a list") | ||
else: | ||
for i, stream in enumerate(streams): | ||
stream_errors = validate_stream(stream, i) | ||
errors.extend(stream_errors) | ||
return errors | ||
|
||
def validate_processes(processes: Any) -> List[str]: | ||
if not isinstance(processes, int): | ||
return ["'processes' should be an integer"] | ||
return [] | ||
|
||
def validate_stream(stream: Dict[str, Any], index: int) -> List[str]: | ||
"""Validate an individual stream configuration.""" | ||
required_fields = ['name', 'documents', 'attributes', 'output'] | ||
expected_type = { | ||
'name': str, | ||
'documents': list, | ||
'attributes': list, | ||
'output': dict | ||
} | ||
errors = [] | ||
|
||
for field in required_fields: | ||
errors.extend(validate_field(stream, field, expected_type[field], index)) | ||
|
||
if 'output' in stream: | ||
output_errors = validate_output(stream['output'], index) | ||
errors.extend(output_errors) | ||
|
||
if 'filter' in stream: | ||
filter_errors = validate_filter_config(stream['filter'], index) | ||
errors.extend(filter_errors) | ||
return errors | ||
|
||
def validate_field(stream: Dict[str, Any], field: str, expected_type: Union[Type, List[Type]], stream_index: int) -> List[str]: | ||
"""Check if a field is present in the stream and has the expected type.""" | ||
errors = [] | ||
if field not in stream: | ||
errors.append(f"Stream {stream_index}: Missing required field: {field}") | ||
elif not isinstance(stream[field], expected_type): | ||
type_name = expected_type.__name__ if isinstance(expected_type, type) else str(expected_type) | ||
errors.append(f"Stream {stream_index}: '{field}' should be a {type_name}") | ||
return errors | ||
|
||
def validate_output(output: Dict[str, Any], stream_index: int) -> List[str]: | ||
"""Validate the output configuration of a stream.""" | ||
required_fields = ['path', 'max_size_in_bytes'] | ||
errors = [] | ||
|
||
for field in required_fields: | ||
if field not in output: | ||
errors.append(f"Stream {stream_index} output: Missing required field: {field}") | ||
|
||
if 'max_size_in_bytes' in output and not isinstance(output['max_size_in_bytes'], int): | ||
errors.append(f"Stream {stream_index} output: 'max_size_in_bytes' should be an integer") | ||
|
||
return errors | ||
|
||
def validate_filter_config(filter_config: Dict[str, Any], stream_index: int) -> List[str]: | ||
"""Validate the filter configuration of a stream.""" | ||
errors = [] | ||
|
||
if 'include' in filter_config and not isinstance(filter_config['include'], list): | ||
errors.append(f"Stream {stream_index} filter: 'include' should be a list") | ||
|
||
if 'exclude' in filter_config and not isinstance(filter_config['exclude'], list): | ||
errors.append(f"Stream {stream_index} filter: 'exclude' should be a list") | ||
|
||
return errors |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
# env_handler.py | ||
import os | ||
import re | ||
from dotenv import load_dotenv | ||
from utils import vprint | ||
|
||
def load_env_variables(): | ||
load_dotenv() | ||
|
||
def expand_custom_env_vars(value): | ||
"""Expand environment variables with ${oc.env:VAR_NAME} syntax.""" | ||
pattern = r'\${oc\.env:([^}]+)}' | ||
|
||
def replace_env_var(match): | ||
env_var_name = match.group(1) | ||
env_var_value = os.getenv(env_var_name) | ||
if env_var_value is None: | ||
print(f"Warning: Environment variable {env_var_name} not found") | ||
return match.group(0) # Return the original string if env var not found | ||
return env_var_value | ||
|
||
return re.sub(pattern, replace_env_var, value) | ||
|
||
def expand_env_vars_in_config(config): | ||
"""Expand environment variables in 'documents' and 'output' sections of the config.""" | ||
if 'streams' in config: | ||
for stream in config['streams']: | ||
if 'documents' in stream: | ||
stream['documents'] = [expand_custom_env_vars(doc) for doc in stream['documents']] | ||
vprint(f"Expanded documents: {stream['documents']}") # Debug print | ||
if 'output' in stream and 'path' in stream['output']: | ||
stream['output']['path'] = expand_custom_env_vars(stream['output']['path']) | ||
vprint(f"Expanded output path: {stream['output']['path']}") # Debug print | ||
return config |
Oops, something went wrong.