From 0c0f10c17d2eb940361b54baa0a836d4e49c92d1 Mon Sep 17 00:00:00 2001 From: Masha iureva <68630408+mariia-iureva@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:42:38 -0700 Subject: [PATCH] Mixer validator (#215) * Adding script that validates if mixer config is well formatted and has everything in place * Add S3 path validation with boto3 existence check * Adding check of the files, trying to run jq expressions on them and see if both files and jq expressions are valid * Add S3 path validation, sampling, and doc-attribute alignment checks * adding logic to split jsonpath expressions into pieces and check them * Added JsonPath syntax evaluation, started working on sampling docs and checking their content * Adding logic to check if all doc and corresponding attributes files contain correct fields and same anount of lines * Adding functionality to check if filters in config and attribute files match * updating filter checking logic to focus on filters missing from the mixer config * adding logic to run jq and jsonpath filters on small set of docs to see if they work or fail * refactored to use smart open and added logic to download sample files to a temp folder * added logic to sample lines from doc and apply filters to it, refactored main, added logic to download sample files and work with them locally * Adding clean up logic to delete sample files after the run * adding test configs for mixer validator * addressing comments, spliting script into smaller files, moving test configs to test folder, adding a couple of helpers functions * adding --verbose method, support of .env variables * supporting != operator * updating types in function definitions, updating Readme * adding more error handlers * deleting the initial version of the script --------- Co-authored-by: Masha Iureva --- configs/test/test_config_jq.yaml | 51 +++ configs/test/test_config_jsonpath.yaml | 50 +++ pyproject.toml | 3 + scripts/validate_mixer/README.md | 44 +++ scripts/validate_mixer/__init__.py | 7 + scripts/validate_mixer/config_handler.py | 112 +++++++ scripts/validate_mixer/env_handler.py | 34 ++ scripts/validate_mixer/file_operations.py | 224 +++++++++++++ scripts/validate_mixer/filter_operations.py | 339 ++++++++++++++++++++ scripts/validate_mixer/main.py | 43 +++ scripts/validate_mixer/s3_utils.py | 81 +++++ scripts/validate_mixer/utils.py | 26 ++ scripts/validate_mixer/validator.py | 232 ++++++++++++++ tests/config/mixer-validator-jq.yaml | 51 +++ tests/config/mixer-validator-jsonpath.yaml | 50 +++ 15 files changed, 1347 insertions(+) create mode 100644 configs/test/test_config_jq.yaml create mode 100644 configs/test/test_config_jsonpath.yaml create mode 100644 scripts/validate_mixer/README.md create mode 100644 scripts/validate_mixer/__init__.py create mode 100644 scripts/validate_mixer/config_handler.py create mode 100644 scripts/validate_mixer/env_handler.py create mode 100644 scripts/validate_mixer/file_operations.py create mode 100644 scripts/validate_mixer/filter_operations.py create mode 100644 scripts/validate_mixer/main.py create mode 100644 scripts/validate_mixer/s3_utils.py create mode 100644 scripts/validate_mixer/utils.py create mode 100644 scripts/validate_mixer/validator.py create mode 100644 tests/config/mixer-validator-jq.yaml create mode 100644 tests/config/mixer-validator-jsonpath.yaml diff --git a/configs/test/test_config_jq.yaml b/configs/test/test_config_jq.yaml new file mode 100644 index 00000000..5c6c4ec7 --- /dev/null +++ b/configs/test/test_config_jq.yaml @@ -0,0 +1,51 @@ +streams: + - name: cc_tiny_subset + documents: + - s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/documents/**/*.jsonl.gz + + attributes: + - cc_tiny_subset_analysis_october + - bff_duplicate_paragraph_spans_new + + output: + path: s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/mixed-output-exclude-bff + max_size_in_bytes: 4294967296 + discard_fields: + - attributes + + filter: + syntax: jq + include: [] + exclude: + # Language filter (using both cld2 and fasttext) + - (.attributes.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 0.8 else false end) + - (.attributes.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 0.8 else false end) + # Document length filter + - (.attributes.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 100 else false end) + # NSFW content filter + - (.attributes.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.5 else false end) + # Gopher quality filter + - (.attributes.cc_tiny_subset_analysis_october__gopher_v2__word_count | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 50 else false end) + - (.attributes.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.1 else false end) + # Deduplication filter (BFF) + - (.attributes.bff_duplicate_paragraph_spans_new | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] >= 1.0 else false end) + # C4 quality filter + - (.attributes.cc_tiny_subset_analysis_october__c4_v2__line_bullets | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.5 else false end) + - (.attributes.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.3 else false end) + + span_replacement: + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__EMAIL_ADDRESS" + min_score: 0.5 + replacement: " |||EMAIL_ADDRESS||| " + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__PHONE_NUMBER" + min_score: 0.5 + replacement: " |||PHONE_NUMBER||| " + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__IP_ADDRESS" + min_score: 0.5 + replacement: " |||IP_ADDRESS||| " + +work_dir: + input: "/tmp/cc_tiny_subset_mix/input" + output: "/tmp/cc_tiny_subset_mix/output" + +processes: 16 \ No newline at end of file diff --git a/configs/test/test_config_jsonpath.yaml b/configs/test/test_config_jsonpath.yaml new file mode 100644 index 00000000..106eedaf --- /dev/null +++ b/configs/test/test_config_jsonpath.yaml @@ -0,0 +1,50 @@ +streams: + - name: cc_tiny_subset + documents: + - s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/documents/**/*.jsonl.gz + + attributes: + - cc_tiny_subset_analysis_october + - bff_duplicate_paragraph_spans_new + + output: + path: s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/mixed-output + max_size_in_bytes: 4294967296 + discard_fields: + - attributes + + filter: + include: [] + exclude: + # Language filter (using both cld2 and fasttext) + - "$.attributes[?(@.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en && @.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en[0] && @.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en[0][2] < 0.8)]" + - "$.attributes[?(@.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en && @.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en[0] && @.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en[0][2] < 0.8)]" + # Document length filter + - "$.attributes[?(@.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document && @.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document[0] && @.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document[0][2] < 100)]" + # Deduplication filter + - "$@.attributes[?(@.bff_duplicate_paragraph_spans_new && @.bff_duplicate_paragraph_spans_new[0] && @.bff_duplicate_paragraph_spans_new[0][2] >= 1.0)]" + # NSFW content filter + - "$.attributes[?(@.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw && @.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw[0] && @.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw[0][2] > 0.5)]" + # Gopher quality filter (example, adjust threshold as needed) + - "$.attributes[?(@.cc_tiny_subset_analysis_october__gopher_v2__word_count && @.cc_tiny_subset_analysis_october__gopher_v2__word_count[0] && @.cc_tiny_subset_analysis_october__gopher_v2__word_count[0][2] < 50)]" + - "$.attributes[?(@.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio && @.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio[0] && @.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio[0][2] > 0.1)]" + # C4 quality filter (example, adjust threshold as needed) + - "$.attributes[?(@.cc_tiny_subset_analysis_october__c4_v2__line_bullets && @.cc_tiny_subset_analysis_october__c4_v2__line_bullets[0] && @.cc_tiny_subset_analysis_october__c4_v2__line_bullets[0][2] > 0.5)]" + - "$.attributes[?(@.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat && @.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat[0] && @.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat[0][2] > 0.3)]" + + span_replacement: + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__EMAIL_ADDRESS" + min_score: 0.5 + replacement: " |||EMAIL_ADDRESS||| " + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__PHONE_NUMBER" + min_score: 0.5 + replacement: " |||PHONE_NUMBER||| " + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__IP_ADDRESS" + min_score: 0.5 + replacement: " |||IP_ADDRESS||| " + +work_dir: + input: "/tmp/cc_tiny_subset_mix/input" + output: "/tmp/cc_tiny_subset_mix/output" + +processes: 16 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index dc89ab78..a4957551 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,8 @@ dependencies = [ # "fasttext==0.9.2", # broken with new version of setuptools; using fasttext-wheel instead "fasttext-wheel==0.9.2", "fsspec>=2023.6.0", + "jsonpath-ng", + "jq", "msgspec>=0.14.2", "nltk>=3.9.1", "omegaconf>=2.3.0", @@ -20,6 +22,7 @@ dependencies = [ # "pycld3==0.22", # does not install correctly "platformdirs>=4.2.0", "pyyaml", + "python-dotenv>=0.19.0", "requests", "rich", "s3fs>=2023.6.0", diff --git a/scripts/validate_mixer/README.md b/scripts/validate_mixer/README.md new file mode 100644 index 00000000..1fbd63a0 --- /dev/null +++ b/scripts/validate_mixer/README.md @@ -0,0 +1,44 @@ +# Dolma Mixer Configuration Validator + +This script validates the configuration for the Dolma Mixer, ensuring that all necessary components are correctly set up before running the main process. + +## Features + +The validator performs the following checks: + +1. Verifies the presence and format of required fields in the configuration. +2. Validates the syntax of the configuration file (YAML or JSON). +3. Checks for duplicate keys in the configuration. +4. Validates JQ or JSONPath expressions for syntax and compilation. +5. Verifies S3 path syntax and accessibility. +6. Confirms write permissions for output paths. +7. Checks the existence and accessibility of attribute files. +8. Samples a subset of files for detailed validation. +9. Ensures alignment between document and attribute files. +10. Validates the format and content of sampled files. +11. Executes JQ or JSONPath commands on sampled files. +12. Validates nested key existence in filter expressions. + +## Usage + +Run the validator using the following command: + +``` +python scripts/validate_mixer/main.py [--num_samples ] [--verbose] +``` + +- ``: Path to your Dolma Mixer configuration file (required) +- `--num_samples `: (Optional) Number of file samples to validate (default: 1) +- `--verbose`: (Optional) Enable verbose output + +## Output + +The script provides detailed progress information and error messages for any validation failures, helping you troubleshoot configuration issues before running the main Dolma Mixer process. + +## Keyboard Interrupt + +The script handles keyboard interrupts (Ctrl+C) gracefully. + +## Exit Status + +The script will exit with a non-zero status if any validation step fails. \ No newline at end of file diff --git a/scripts/validate_mixer/__init__.py b/scripts/validate_mixer/__init__.py new file mode 100644 index 00000000..1d51eedb --- /dev/null +++ b/scripts/validate_mixer/__init__.py @@ -0,0 +1,7 @@ +from .config_handler import load_config, validate_config_structure, validate_stream, validate_output, validate_filter_config +from .validator import load_and_validate_config, validate_s3_paths_and_permissions, validate_stream_filters, validate_documents_and_attributes +from .file_operations import sample_files, download_file, sample_and_download_files, count_file_lines, check_attribute_name_typos, sample_file_lines, sample_documents_with_attributes, validate_jsonl, validate_filters_and_check_typos, sample_and_extract_attributes +from .filter_operations import validate_jq_expression, validate_jsonpath_expression, validate_filter_expressions, evaluate_comparison, evaluate_jsonpath_condition, split_complex_jsonpath, prepare_filter, execute_filter_commands, extract_attribute_names_from_filters, extract_filter_attributes +from .s3_utils import validate_s3_path, check_s3_path_exists, check_s3_path_writable, check_s3_parent_exists, list_s3_objects, get_base_path, get_corresponding_attribute_path +from .utils import keyboard_interrupt_handler +from .env_handler import load_env_variables, expand_env_vars \ No newline at end of file diff --git a/scripts/validate_mixer/config_handler.py b/scripts/validate_mixer/config_handler.py new file mode 100644 index 00000000..df360c40 --- /dev/null +++ b/scripts/validate_mixer/config_handler.py @@ -0,0 +1,112 @@ +import yaml +import json +import os +from typing import Dict, Any, List, Union, Type +from env_handler import expand_env_vars_in_config + +def load_config(config_path: str) -> Dict[str, Any]: + """Load the configuration file (YAML or JSON).""" + if not os.path.exists(config_path): + raise FileNotFoundError(f"Config file not found at path: {config_path}") + try: + with open(config_path, 'r') as file: + if config_path.endswith('.yaml') or config_path.endswith('.yml'): + config = yaml.safe_load(file) + elif config_path.endswith('.json'): + config = json.load(file) + else: + raise ValueError("Unsupported file format. Use .yaml, .yml, or .json") + + config = expand_env_vars_in_config(config) + return config + except Exception as e: + raise ValueError(f"Error loading config file: {str(e)}") + +def validate_config_structure(config: Dict[str, Any]) -> List[str]: + """Validate the basic structure of the configuration.""" + required_fields = ['streams', 'processes'] + errors = [] + + for field in required_fields: + if field not in config: + errors.append(f"Missing required field: {field}") + elif field == 'streams': + errors.extend(validate_streams(config[field])) + elif field == 'processes': + errors.extend(validate_processes(config[field])) + + return errors + +def validate_streams(streams: Any) -> List[str]: + errors = [] + if not isinstance(streams, list): + errors.append("'streams' should be a list") + else: + for i, stream in enumerate(streams): + stream_errors = validate_stream(stream, i) + errors.extend(stream_errors) + return errors + +def validate_processes(processes: Any) -> List[str]: + if not isinstance(processes, int): + return ["'processes' should be an integer"] + return [] + +def validate_stream(stream: Dict[str, Any], index: int) -> List[str]: + """Validate an individual stream configuration.""" + required_fields = ['name', 'documents', 'attributes', 'output'] + expected_type = { + 'name': str, + 'documents': list, + 'attributes': list, + 'output': dict + } + errors = [] + + for field in required_fields: + errors.extend(validate_field(stream, field, expected_type[field], index)) + + if 'output' in stream: + output_errors = validate_output(stream['output'], index) + errors.extend(output_errors) + + if 'filter' in stream: + filter_errors = validate_filter_config(stream['filter'], index) + errors.extend(filter_errors) + return errors + +def validate_field(stream: Dict[str, Any], field: str, expected_type: Union[Type, List[Type]], stream_index: int) -> List[str]: + """Check if a field is present in the stream and has the expected type.""" + errors = [] + if field not in stream: + errors.append(f"Stream {stream_index}: Missing required field: {field}") + elif not isinstance(stream[field], expected_type): + type_name = expected_type.__name__ if isinstance(expected_type, type) else str(expected_type) + errors.append(f"Stream {stream_index}: '{field}' should be a {type_name}") + return errors + +def validate_output(output: Dict[str, Any], stream_index: int) -> List[str]: + """Validate the output configuration of a stream.""" + required_fields = ['path', 'max_size_in_bytes'] + errors = [] + + for field in required_fields: + if field not in output: + errors.append(f"Stream {stream_index} output: Missing required field: {field}") + + if 'max_size_in_bytes' in output and not isinstance(output['max_size_in_bytes'], int): + errors.append(f"Stream {stream_index} output: 'max_size_in_bytes' should be an integer") + + return errors + +def validate_filter_config(filter_config: Dict[str, Any], stream_index: int) -> List[str]: + """Validate the filter configuration of a stream.""" + errors = [] + + if 'include' in filter_config and not isinstance(filter_config['include'], list): + errors.append(f"Stream {stream_index} filter: 'include' should be a list") + + if 'exclude' in filter_config and not isinstance(filter_config['exclude'], list): + errors.append(f"Stream {stream_index} filter: 'exclude' should be a list") + + return errors \ No newline at end of file diff --git a/scripts/validate_mixer/env_handler.py b/scripts/validate_mixer/env_handler.py new file mode 100644 index 00000000..832d3776 --- /dev/null +++ b/scripts/validate_mixer/env_handler.py @@ -0,0 +1,34 @@ +# env_handler.py +import os +import re +from dotenv import load_dotenv +from utils import vprint + +def load_env_variables(): + load_dotenv() + +def expand_custom_env_vars(value): + """Expand environment variables with ${oc.env:VAR_NAME} syntax.""" + pattern = r'\${oc\.env:([^}]+)}' + + def replace_env_var(match): + env_var_name = match.group(1) + env_var_value = os.getenv(env_var_name) + if env_var_value is None: + print(f"Warning: Environment variable {env_var_name} not found") + return match.group(0) # Return the original string if env var not found + return env_var_value + + return re.sub(pattern, replace_env_var, value) + +def expand_env_vars_in_config(config): + """Expand environment variables in 'documents' and 'output' sections of the config.""" + if 'streams' in config: + for stream in config['streams']: + if 'documents' in stream: + stream['documents'] = [expand_custom_env_vars(doc) for doc in stream['documents']] + vprint(f"Expanded documents: {stream['documents']}") # Debug print + if 'output' in stream and 'path' in stream['output']: + stream['output']['path'] = expand_custom_env_vars(stream['output']['path']) + vprint(f"Expanded output path: {stream['output']['path']}") # Debug print + return config \ No newline at end of file diff --git a/scripts/validate_mixer/file_operations.py b/scripts/validate_mixer/file_operations.py new file mode 100644 index 00000000..0d3ceea2 --- /dev/null +++ b/scripts/validate_mixer/file_operations.py @@ -0,0 +1,224 @@ +import os +import random +import re +from typing import Optional, List, Dict, Any, Tuple +from tqdm import tqdm +import boto3 +import json +import itertools +import smart_open +from botocore.exceptions import ClientError + +from s3_utils import s3_client, list_s3_objects, get_base_path, get_corresponding_attribute_path +from utils import vprint + +class FileDownloadError(Exception): + pass + +def sample_files(s3_path: str, num_samples: int) -> List[str]: + """Sample a subset of files from an S3 path.""" + all_files = list(list_s3_objects(s3_path)) + # Filter out directories (paths ending with '/') + all_files = [f for f in all_files if not f.endswith('/')] + chosen_files = random.sample(all_files, min(int(num_samples), len(all_files))) + print(f"Sampled {len(chosen_files)} files from {len(all_files)} matching files") + return chosen_files + +def download_file(s3_path: str, local_path: str) -> None: + bucket, key = s3_path.replace("s3://", "").split("/", 1) + try: + s3_client.download_file(bucket, key, local_path) + except ClientError as e: + if e.response['Error']['Code'] == '404': + raise FileDownloadError(f"File not found: {s3_path}") + else: + raise FileDownloadError(f"Error downloading file {s3_path}: {str(e)}") + + +def sample_and_download_files(stream: Dict[str, Any], num_samples: int) -> Tuple[List[str], Dict[str, List[str]]]: + temp_dir = "temp_sample_files" + + # Create the temporary directory if it doesn't exist + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + + try: + doc_samples = sample_files(stream['documents'][0], num_samples) + + base_doc_path = get_base_path(stream['documents'][0]) + base_attr_path = re.sub(r'/documents($|/)', r'/attributes\1', base_doc_path) + + total_files = len(doc_samples) * (len(stream['attributes']) + 1) # +1 for the document itself + + with tqdm(total=total_files, desc="Downloading files") as pbar: + local_doc_samples = [] + local_attr_samples_dict = {attr_type: [] for attr_type in stream['attributes']} + + for doc_sample in doc_samples: + try: + local_doc_path = os.path.join(temp_dir, os.path.basename(doc_sample)) + download_file(doc_sample, local_doc_path) + local_doc_samples.append(local_doc_path) + pbar.update(1) + + # Extract the base name and extension + base_name, extension = os.path.splitext(os.path.basename(doc_sample)) + if extension == '.gz': + # Handle double extensions like .jsonl.gz + base_name, inner_extension = os.path.splitext(base_name) + extension = inner_extension + extension + + for attr_type in stream['attributes']: + attr_sample = get_corresponding_attribute_path(doc_sample, base_doc_path, base_attr_path, attr_type) + # Construct the new filename with the attribute type before the extension, using a hyphen + new_filename = f"{base_name}-{attr_type}{extension}" + local_attr_path = os.path.join(temp_dir, new_filename) + download_file(attr_sample, local_attr_path) + local_attr_samples_dict[attr_type].append(local_attr_path) + pbar.update(1) + except FileDownloadError as e: + print(f"Warning: {str(e)}. Skipping this file and its attributes.") + continue + + return local_doc_samples, local_attr_samples_dict + + except Exception as e: + print(f"An error occurred during file sampling and downloading: {str(e)}") + raise + +def count_file_lines(file_path: str) -> int: + """ + Count the number of lines in a file (local or S3, compressed or not). + + :param file_path: Path to the file (can be S3 or local) + :return: Number of lines in the file, or -1 if an error occurred + """ + # print(f"Counting lines in file: {file_path}") + try: + with smart_open.open(file_path, 'rb') as f: + # print("successfully opened file in count_file_lines") + line_count = sum(1 for _ in f) + return line_count + except Exception as e: + print(f"Error counting lines in file {file_path}: {str(e)}") + return -1 + +def check_attribute_name_typos(config_attributes: set, sample_attributes: set) -> None: + """Check for typos in attribute names by comparing config and sample data.""" + missing_in_sample = config_attributes - sample_attributes + extra_in_sample = sample_attributes - config_attributes + + if missing_in_sample: + print("Warning: The following attributes are in the config but not in the sample data:") + for attr in missing_in_sample: + print(f" - {attr}") + + if extra_in_sample: + print("Info: The following attributes are in the sample data but not used in the config:") + for attr in extra_in_sample: + print(f" - {attr}") + +def sample_file_lines(file_path: str, num_lines: int = 1) -> Optional[List[str]]: + """ + Sample N lines from a file, handling both local and S3 paths, and compression. + + Args: + file_path (str): Path to the file (local or S3) + num_lines (int): Number of lines to sample (default: 1) + + Returns: + list: List of sampled lines, or None if an error occurred + """ + try: + if not isinstance(file_path, str): + raise ValueError(f"Expected string for file_path, got {type(file_path)}") + with smart_open.open(file_path, 'r') as f: + # Use itertools.islice to efficiently read N lines + sampled_lines = list(itertools.islice(f, num_lines)) + + if not sampled_lines: + print(f"Warning: File is empty or could not be read: {file_path}") + return None + + # Strip whitespace from each line + sampled_lines = [line.strip() for line in sampled_lines] + + if len(sampled_lines) < num_lines: + print(f"Warning: Requested {num_lines} lines, but file only contains {len(sampled_lines)} lines: {file_path}") + + return sampled_lines + + except ValueError as ve: + print(f"Error in sample_file_lines: {str(ve)}") + return None + except Exception as e: + print(f"Error in sample_file_lines when reading file {file_path}: {str(e)}") + print(f"Error type: {type(e)}") + print(f"File path type: {type(file_path)}") + return None + + +def sample_documents_with_attributes(doc_file_paths: List[str], attr_file_paths: List[str], num_samples: int = 100) -> List[Dict[str, Any]]: + sampled_docs = [] + for doc_path, attr_paths in zip(doc_file_paths, attr_file_paths): + doc_lines = sample_file_lines(doc_path, num_samples) + if not doc_lines: + continue + + attr_samples = {} + for attr_path in attr_paths: + attr_lines = sample_file_lines(attr_path, num_samples) + if attr_lines: + attr_name = os.path.basename(attr_path).split('.')[0] # Extract attribute name from file name + attr_samples[attr_name] = attr_lines + + for i, doc_line in enumerate(doc_lines): + doc = json.loads(doc_line) + for attr_name, attr_lines in attr_samples.items(): + if i < len(attr_lines): + doc[attr_name] = json.loads(attr_lines[i]) + sampled_docs.append(doc) + + return sampled_docs + + +def validate_jsonl(file_path: str, expected_fields: set) -> Tuple[bool, List[str]]: + """ + Validate that the file is a valid JSONL and contains expected fields. + + :param file_path: Path to the file (can be S3 or local) + :param expected_fields: Set of field names expected in each JSON object + :return: Tuple (is_valid, error_messages) + """ + unexpected_fields = set() + error_messages = [] + is_valid = True + + try: + with smart_open.open(file_path, 'r') as f: + for i, line in enumerate(f, 1): + try: + data = json.loads(line) + missing_fields = expected_fields - set(data.keys()) + new_fields = set(data.keys()) - expected_fields + + if missing_fields: + error_messages.append(f"Line {i}: Missing expected fields: {missing_fields}") + is_valid = False + + if new_fields: + unexpected_fields.update(new_fields) + is_valid = False + + except json.JSONDecodeError: + error_messages.append(f"Line {i}: Invalid JSON") + is_valid = False + + except Exception as e: + error_messages.append(f"Error reading file {file_path}: {str(e)}") + is_valid = False + + if unexpected_fields: + error_messages.append(f"Additional fields found across the file: {unexpected_fields}") + return is_valid, error_messages + diff --git a/scripts/validate_mixer/filter_operations.py b/scripts/validate_mixer/filter_operations.py new file mode 100644 index 00000000..554db193 --- /dev/null +++ b/scripts/validate_mixer/filter_operations.py @@ -0,0 +1,339 @@ +import re +import json +from typing import Optional, Union, List, Dict, Any, Tuple +import jq +from jsonpath_ng.ext import parse as parse_jsonpath +from file_operations import sample_file_lines +from utils import vprint + +def validate_jq_expression(expr: str) -> Tuple[bool, Optional[str]]: + """Validate a JQ expression.""" + try: + jq.compile(expr) + return True, None + except ValueError as e: + return False, str(e) + +def validate_jsonpath_expression(expr: str) -> Tuple[bool, Optional[str]]: + """Validate a JSONPath expression.""" + try: + parse_jsonpath(expr) + return True, None + except Exception as e: + return False, str(e) + +def validate_filter_expressions(filter_config: Dict[str, Any]) -> Tuple[List[str], List[str]]: + """Validate filter expressions based on specified syntax.""" + errors = [] + warnings = [] + + syntax = filter_config.get('syntax', 'jsonpath').lower() # Default to JSONPath if not specified + + if syntax not in ['jq', 'jsonpath']: + warnings.append(f"Unsupported syntax '{syntax}'. Defaulting to JSONPath for validation.") + syntax = 'jsonpath' + + validate_function = validate_jq_expression if syntax == 'jq' else validate_jsonpath_expression + + for filter_type in ['include', 'exclude']: + expressions = filter_config.get(filter_type, []) + for expr in expressions: + if syntax == 'jsonpath': + # Check for and remove '@' only if it appears in "$@." pattern at the beginning + if expr.startswith('$@.'): + expr = '$' + expr[2:] + warnings.append(f"Removed '@' from '$@.' at the beginning of the expression: {expr}") + + # conditions = split_complex_jsonpath(expr) + conditions = split_complex_jsonpath(expr) + for condition in conditions: + # Check for comparison operators + skip_operators = ["==", "<=", ">=", "<", ">"] + if any(op in condition for op in skip_operators): + operator = next(op for op in skip_operators if op in condition) + # warnings.append(f"Temporarily skipping expression because it contains '{operator}' operator: {condition}") + continue + valid, error = validate_function(condition) + if not valid: + errors.append(f"Invalid {syntax.upper()} sub-expression in {filter_type}: {condition}. Error: {error}") + else: + is_valid, error = validate_function(expr) + if not is_valid: + errors.append(f"Invalid {syntax.upper()} expression in {filter_type}: {expr}. Error: {error}") + + return errors, warnings + + +def validate_filters_and_check_typos(attr_file_paths: List[str], filter_config: Dict[str, Any], stream_attributes: List[str]): + """Validate filters and check for attribute name typos across multiple attribute files.""" + vprint("Validating filters and checking typos across all attribute files...") + + # Extract filter attributes from config + filter_attributes = extract_filter_attributes(filter_config) + + # Sample and extract attributes from all files + all_sampled_attributes = set() + for attr_file_path in attr_file_paths: + sampled_attributes = sample_and_extract_attributes(attr_file_path) + all_sampled_attributes.update(sampled_attributes) + + # Check if all mixer config filters are found + missing_attributes = filter_attributes - all_sampled_attributes + + if not missing_attributes: + print("All mixer config filters are FOUND in the attribute files.\n") + else: + print("Warning: Some mixer config filters were not found in the attribute files.") + print("Missing attributes:") + for attr in missing_attributes: + print(f" - {attr}") + + vprint("\nAll attributes found in files:") + for attr in sorted(all_sampled_attributes): + vprint(f" - {attr}") + + vprint("\nThis detailed list is provided to help identify potential typos or misconfigurations\n") + +def sample_and_extract_attributes(attr_file_path: str, num_samples: int = 5) -> set: + """Sample lines from the attribute file and extract unique attributes.""" + sampled_attributes = set() + sampled_lines = sample_file_lines(attr_file_path, num_lines=num_samples) + + for line in sampled_lines: + try: + data = json.loads(line) + sampled_attributes.update(data['attributes'].keys()) + except (json.JSONDecodeError, KeyError): + print(f"Error: Invalid JSON or missing 'attributes' key in sampled line from {attr_file_path}") + + return sampled_attributes + +def evaluate_comparison(value: Union[int, float], op: str, comparison_value: Union[int, float]) -> bool: + if op == '==': + return value == comparison_value + elif op == '!=': + return value != comparison_value + elif op == '<': + return value < comparison_value + elif op == '>': + return value > comparison_value + elif op == '<=': + return value <= comparison_value + elif op == '>=': + return value >= comparison_value + else: + raise ValueError(f"Unsupported operator: {op}") + +def evaluate_jsonpath_condition(data: Dict[str, Any], condition: str) -> bool: + # print(f"\nEvaluating condition: {condition}") + try: + # Check if the condition contains a comparison + match = re.match(r'(.*?)\s*(==|<=|>=|<|>)\s*(.*)', condition) + if match: + path, op, comparison = match.groups() + comparison_value = float(comparison.strip()) + + # Extract all indices from the path + indices = re.findall(r'\[(\d+)\]', path) + indices = [int(index) for index in indices] + + # Remove all indices from the path for evaluation + path = re.sub(r'\[\d+\]', '', path) + + # Evaluate the path part + jsonpath_expr = parse_jsonpath(path.strip()) + matches = jsonpath_expr.find(data) + + if matches: + value = matches[0].value + # Apply all indices sequentially + for index in indices: + if isinstance(value, list) and len(value) > index: + value = value[index] + else: + print(f" Index {index} is out of range or value is not a list") + return False + + result = evaluate_comparison(value, op, comparison_value) + # print(f" Comparison: {value} {op} {comparison_value}") + # print(f" Result: {result}") + return result + else: + print(" No matches found for the path") + return False + else: + # Regular JSONPath without comparison + jsonpath_expr = parse_jsonpath(condition) + matches = jsonpath_expr.find(data) + return len(matches) > 0 + except Exception as e: + print(f"Error evaluating condition: {e}") + return False + +def split_complex_jsonpath(complex_expression: str) -> List[str]: + # Remove the outer quotes, brackets, and any leading/trailing whitespace + expression = complex_expression.strip().strip('"- ').strip('[]') + + # Check for and remove '@' only if it appears in "$@." pattern at the beginning + if expression.startswith('$@.'): + expression = '$' + expression[2:] + + # Remove the initial "$.attributes[?(" and the trailing ")]" + expression = re.sub(r'^\$\.attributes\[\?\(|\)\]$', '', expression) + + # Split the expression by '&&' but keep the JSONPath intact + parts = re.split(r'\s*&&\s*', expression) + + conditions = [] + base_path = "$.attributes" + + for part in parts: + # Remove the '@.' at the beginning of each condition + part = part.replace('@.', '') + + # Remove any trailing parenthesis + part = part.rstrip(')') + + # Check if it's a comparison condition + if any(op in part for op in ['<', '>', '==', '<=', '>=']): + conditions.append(f"{base_path}.{part}") + else: + # For existence checks, we don't need to change anything + conditions.append(f"{base_path}.{part}") + + return conditions + +def prepare_filter(filter_expr: str, syntax: str) -> Union[List[str], Any]: + if syntax == 'jsonpath': + return split_complex_jsonpath(filter_expr) + elif syntax == 'jq': + return jq.compile(filter_expr) + else: + raise ValueError(f"Unsupported filter syntax: {syntax}. Supported options are 'jsonpath' and 'jq'.") + +def execute_filter_commands(attr_samples_dict: Dict[str, List[str]], attributes_list: List[str], filter_config: Dict[str, Any], num_lines: int = 100) -> Dict[str, int]: + """ + Execute filter commands on sampled lines from documents and their attributes. + Supports both JQ and JSONPath expressions based on the specified syntax. + """ + print(f"Executing filter commands on attribute files, sampling {num_lines} lines for 1 file and their attributes.") + + results = { + 'total_lines': 0, + 'lines_excluded': 0, + 'lines_passed': 0, + 'errors': 0 + } + + include_filters = filter_config.get('include', []) + exclude_filters = filter_config.get('exclude', []) + syntax = filter_config.get('syntax', 'jsonpath') + + # Compile JQ expressions or prepare JSONPath expressions based on the specified syntax + try: + include_filters_compiled = [prepare_filter(filter_expr, syntax) for filter_expr in include_filters] + exclude_filters_compiled = [prepare_filter(filter_expr, syntax) for filter_expr in exclude_filters] + except ValueError as e: + print(f"Error preparing filters: {str(e)}") + return results + + # Step 1: Sample lines from each attribute file and build a list of combined attribute data + attr_lines_list = [] + for attr_name in attributes_list: + attr_paths = attr_samples_dict.get(attr_name, []) + if not attr_paths: + print(f"No attribute files found for '{attr_name}'.") + return results + attr_path = attr_paths[0] + attr_lines = sample_file_lines(attr_path, num_lines) + if not attr_lines: + print(f"No lines sampled from attribute file '{attr_path}'.") + return results + attr_lines_list.append(attr_lines) + + for lines_tuple in zip(*attr_lines_list): + results['total_lines'] += 1 + combined_attr_data = {'attributes': {}} + doc_id = None + for line in lines_tuple: + try: + attr_data = json.loads(line) + if doc_id is None: + doc_id = attr_data.get('id') + combined_attr_data['id'] = doc_id + elif doc_id != attr_data.get('id'): + print(f"Mismatch in doc_ids: {doc_id} != {attr_data.get('id')}") + results['errors'] += 1 + break + combined_attr_data['attributes'].update(attr_data.get('attributes', {})) + except json.JSONDecodeError: + results['errors'] += 1 + break + except Exception as e: + print(f"Error processing line: {str(e)}") + results['errors'] += 1 + break + else: + # No break occurred, so process filters + try: + # Apply include filters + if syntax == 'jsonpath': + include_passed = all( + all(evaluate_jsonpath_condition(combined_attr_data, condition) for condition in filter_expr) + for filter_expr in include_filters_compiled + ) + else: # JQ + include_passed = all( + filter_expr.input(combined_attr_data).first() + for filter_expr in include_filters_compiled + ) + + # Apply exclude filters if include filters passed + if include_passed: + if syntax == 'jsonpath': + exclude_matched = any( + all(evaluate_jsonpath_condition(combined_attr_data, condition) for condition in filter_expr) + for filter_expr in exclude_filters_compiled + ) + else: # JQ + exclude_matched = any( + filter_expr.input(combined_attr_data).first() + for filter_expr in exclude_filters_compiled + ) + + if exclude_matched: + results['lines_excluded'] += 1 + else: + results['lines_passed'] += 1 + else: + results['lines_excluded'] += 1 + except Exception as e: + print(f"Error applying filters: {str(e)}") + results['errors'] += 1 + + print("\nFilter execution results:") + print(f"Total lines processed: {results['total_lines']}") + print(f"Lines passed all filters: {results['lines_passed']}") + print(f"Lines excluded by filters: {results['lines_excluded']}") + print(f"Errors encountered: {results['errors']}\n") + + return results + +def extract_attribute_names_from_filters(filters: List[str]) -> set: + attribute_names = set() + for filter_expr in filters: + # Extract attribute names from JSONPath expressions + matches = re.findall(r'@\.([a-zA-Z0-9_]+)', filter_expr) + attribute_names.update(matches) + return attribute_names + +def extract_filter_attributes(filter_config: Dict[str, Any]) -> set: + """Extract attribute names from filter expressions.""" + filter_attributes = set() + for filter_type in ['include', 'exclude']: + for filter_expr in filter_config.get(filter_type, []): + # Extract attribute names from JSONPath expressions + matches = re.findall(r'@\.([a-zA-Z_][a-zA-Z0-9_]*)', filter_expr) + filter_attributes.update(matches) + return filter_attributes + diff --git a/scripts/validate_mixer/main.py b/scripts/validate_mixer/main.py new file mode 100644 index 00000000..2a25c99e --- /dev/null +++ b/scripts/validate_mixer/main.py @@ -0,0 +1,43 @@ +import sys +import signal +import sys +import signal +import argparse +from validator import load_and_validate_config, validate_s3_paths_and_permissions, validate_stream_filters, validate_documents_and_attributes +from utils import keyboard_interrupt_handler, set_verbose +from env_handler import load_env_variables + +def main(config_path, num_samples, verbose): + # Register the keyboard interrupt handler + signal.signal(signal.SIGINT, keyboard_interrupt_handler) + + # Set verbose mode + set_verbose(args.verbose) + + load_env_variables() + config = load_and_validate_config(config_path) + if config is None: + print("Configuration loading or validation FAILED") + + if not validate_s3_paths_and_permissions(config): + print("S3 path validation FAILED") + # return + + if not validate_stream_filters(config): + print("Filter validation FAILED.\n") + return + + if not validate_documents_and_attributes(config, num_samples): + print("Document and attribute validation FAILED") + return + + print("Validation FINISHED!") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Validate mixer configuration") + parser.add_argument("config_path", help="Path to the configuration file") + parser.add_argument("--num_samples", type=int, default=1, help="Number of file samples to validate") + parser.add_argument("--verbose", action="store_true", help="Enable verbose output") + args = parser.parse_args() + + main(args.config_path, args.num_samples, args.verbose) diff --git a/scripts/validate_mixer/s3_utils.py b/scripts/validate_mixer/s3_utils.py new file mode 100644 index 00000000..86cac9ec --- /dev/null +++ b/scripts/validate_mixer/s3_utils.py @@ -0,0 +1,81 @@ +import re +from typing import Tuple, Generator, List +from botocore.exceptions import ClientError +import boto3 + +s3_client = boto3.client('s3') + +def validate_s3_path(s3_path: str) -> Tuple[bool, str | None]: + """Validate an S3 path.""" + pattern = r'^s3://[\w.-]+/.*$' + if not re.match(pattern, s3_path): + return False, f"Invalid S3 path format: {s3_path}" + return True, None + +def check_s3_path_exists(s3_path: str) -> Tuple[bool, str | None]: + """Check if an S3 path exists and is accessible.""" + try: + bucket, key = s3_path[5:].split('/', 1) + if key.endswith('/'): + # For directories, we just need to check if the prefix exists + response = s3_client.list_objects_v2(Bucket=bucket, Prefix=key, MaxKeys=1) + if 'Contents' not in response: + return False, f"S3 path does not exist or is empty: {s3_path}" + else: + # For files, we can use head_object + s3_client.head_object(Bucket=bucket, Key=key) + return True, None + except ClientError as e: + return False, f"S3 path does not exist or is not accessible: {s3_path}. Error: {str(e)}" + +def check_s3_path_writable(s3_path: str) -> Tuple[bool, str | None]: + """Check if an S3 path is writable.""" + try: + bucket, key = s3_path[5:].split('/', 1) + # Ensure the key ends with a '/' to treat it as a directory + if not key.endswith('/'): + key += '/' + s3_client.put_object(Bucket=bucket, Key=f"{key}test_write", Body=b'') + s3_client.delete_object(Bucket=bucket, Key=f"{key}test_write") + return True, None + except ClientError as e: + return False, f"S3 path is not writable: {s3_path}. Error: {str(e)}" + +def check_s3_parent_exists(s3_path: str) -> Tuple[bool, str | None]: + """Check if the parent directory of an S3 path exists.""" + parent_path = '/'.join(s3_path.split('/')[:-1]) + '/' + return check_s3_path_exists(parent_path) + +def list_s3_objects(s3_path: str) -> Generator[str, None, None]: + """List objects in an S3 path, handling wildcards.""" + + bucket, prefix = s3_path[5:].split('/', 1) + + # Remove '**/' from the prefix + prefix = prefix.replace('**/', '') + + # Remove the filename pattern (e.g., '*.jsonl.gz') from the prefix + prefix = '/'.join(prefix.split('/')[:-1]) + '/' + + paginator = s3_client.get_paginator('list_objects_v2') + + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + if 'Contents' in page: + for obj in page['Contents']: + yield f"s3://{bucket}/{obj['Key']}" + +def get_base_path(s3_path: str) -> str: + """Extract the base path from an S3 path with wildcards.""" + parts = s3_path.split('/') + base_parts = [] + for part in parts: + if part == '**': + break + base_parts.append(part) + return '/'.join(base_parts) + +def get_corresponding_attribute_path(doc_path: str, base_doc_path: str, base_attr_path: str, attr_type: str) -> str: + """Get the corresponding attribute path for a given document path and attribute type.""" + relative_path = doc_path.replace(base_doc_path, '', 1) + relative_path = relative_path.lstrip('/') + return f"{base_attr_path.rstrip('/')}/{attr_type}/{relative_path}" \ No newline at end of file diff --git a/scripts/validate_mixer/utils.py b/scripts/validate_mixer/utils.py new file mode 100644 index 00000000..a2508b88 --- /dev/null +++ b/scripts/validate_mixer/utils.py @@ -0,0 +1,26 @@ +import sys +import signal +import re +from typing import List, Set +import builtins + +def keyboard_interrupt_handler(signal: int, frame: object) -> None: + """ + Handle keyboard interrupt (Ctrl+C) gracefully. + + :param signal: Signal number + :param frame: Current stack frame + """ + print("\n\nScript interrupted by user") + sys.exit(0) + + +VERBOSE = False + +def set_verbose(verbose): + global VERBOSE + VERBOSE = verbose + +def vprint(*args, **kwargs): + if VERBOSE: + builtins.print(*args, **kwargs) \ No newline at end of file diff --git a/scripts/validate_mixer/validator.py b/scripts/validate_mixer/validator.py new file mode 100644 index 00000000..ffdecc26 --- /dev/null +++ b/scripts/validate_mixer/validator.py @@ -0,0 +1,232 @@ +import re +import os +import shutil +import sys +from typing import Dict, List, Tuple, Any, Optional +from dotenv import load_dotenv + +from s3_utils import ( + validate_s3_path, + check_s3_path_exists, + check_s3_parent_exists, + check_s3_path_writable, + list_s3_objects, + get_base_path +) +from file_operations import ( + sample_and_download_files, + count_file_lines, + validate_jsonl +) +from filter_operations import ( + validate_filter_expressions, + execute_filter_commands, + extract_attribute_names_from_filters, + validate_filters_and_check_typos +) +from config_handler import load_config, validate_config_structure + +from env_handler import load_env_variables + +from utils import vprint + +def load_and_validate_config(config_path): + #loading environment variables + load_env_variables() + + vprint("Validating configuration file...") + try: + config = load_config(config_path) + + except FileNotFoundError as e: + print(str(e)) + print("Please check the file path and try again.") + sys.exit(1) + except ValueError as e: + print(f"Error loading or validating config: {str(e)}") + sys.exit(1) + + vprint("Validating configuration structure...") + errors = validate_config_structure(config) + + if errors: + print("Configuration validation FAILED. Errors:") + for error in errors: + print(f"- {error}") + return None + else: + print("Configuration validation SUCCESSFUL.\n") + return config + +def validate_s3_paths_and_permissions(config: Dict[str, Any]) -> bool: + vprint("Validating S3 paths and permissions...") + for stream in config['streams']: + base_doc_path = get_base_path(stream['documents'][0]) + base_attr_path = re.sub(r'/documents($|/)', r'/attributes\1', base_doc_path) + + # Validate document patterns + for doc_pattern in stream['documents']: + is_valid, error = validate_s3_path(doc_pattern) + if not is_valid: + print(f"Error: {error}") + else: + matching_objects = [obj for obj in list_s3_objects(doc_pattern) if not obj.endswith('/')] + if not matching_objects: + print(f"Warning: No objects found matching pattern: {doc_pattern}") + else: + vprint(f"Found {len(matching_objects)} objects matching pattern: {doc_pattern}") + + # Validate output path + output_path = stream['output']['path'] + is_valid, error = validate_s3_path(output_path) + if not is_valid: + print(f"Error: {error}") + else: + parent_exists, error = check_s3_parent_exists(output_path) + if not parent_exists: + print(f"Error: Parent directory does not exist for output path: {output_path}") + writable, error = check_s3_path_writable(output_path) + if not writable: + print(f"Error: {error}") + return False + + # Validate attribute paths + for attr in stream['attributes']: + attr_path = f"{base_attr_path}/{attr}/" + is_valid, error = validate_s3_path(attr_path) + if not is_valid: + print(f"Error: {error}") + else: + exists, error = check_s3_path_exists(attr_path) + if not exists: + print(f"Error: Attribute path does not exist: {attr_path}") + else: + vprint(f"Found attribute path: {attr_path}") + objects = list(list_s3_objects(attr_path)) + if not objects: + print(f" Warning: No objects found in this attribute path") + + return True # All validations passed + +def validate_stream_filters(config: Dict[str, Any]) -> bool: + vprint("\nValidating filter expressions...") + all_valid = True + + for stream in config['streams']: + if 'filter' in stream: + filter_config = stream['filter'] + filter_errors, filter_warnings = validate_filter_expressions(filter_config) + + if filter_warnings: + vprint(f"Warnings in filter configuration for stream '{stream['name']}':") + for warning in filter_warnings: + vprint(f"- Warning: {warning}") + + if filter_errors: + all_valid = False + print(f"Errors in filter expressions for stream '{stream['name']}':") + for error in filter_errors: + print(f"- {error}") + print("Filters validation COMPLETE.\n") + + return all_valid + +def validate_documents_and_attributes(config: Dict[str, Any], num_samples: int) -> bool: + vprint("Sampling files...") + temp_dir = "temp_sample_files" + try: + for stream in config['streams']: + filter_attributes = set() + if 'filter' in stream: + include_filters = stream['filter'].get('include', []) + exclude_filters = stream['filter'].get('exclude', []) + filter_attributes = extract_attribute_names_from_filters(include_filters + exclude_filters) + + base_doc_path = get_base_path(stream['documents'][0]) + base_attr_path = re.sub(r'/documents($|/)', r'/attributes\1', base_doc_path) + + try: + doc_samples, attr_samples_dict = sample_and_download_files(stream, num_samples) + except Exception as e: + print(f"Error during file sampling and downloading: {str(e)}") + return False + + if not doc_samples: + print("No document samples were successfully downloaded. Skipping further validation for this stream.") + continue + + for doc_sample in doc_samples: + vprint(f"\nValidating file: {doc_sample}") + + doc_line_count = count_file_lines(doc_sample) + if doc_line_count == -1: + print(f"Failed to count lines in document file {doc_sample}. Skipping the file") + continue + + vprint(f"Document has {doc_line_count} lines") + + doc_expected_fields = {'id', 'text', 'source', 'created', 'added', 'version', 'metadata', 'attributes'} + is_valid, error_messages = validate_jsonl(doc_sample, doc_expected_fields) + if not is_valid: + print("Document validation failed:") + for error in error_messages: + print(f" {error}") + return False + + for attr_type in stream['attributes']: + if attr_type not in attr_samples_dict or not attr_samples_dict[attr_type]: + print(f"Warning: No attribute samples found for {attr_type}. Skipping validation for this attribute type.") + continue + + try: + doc_index = doc_samples.index(doc_sample) + if doc_index >= len(attr_samples_dict[attr_type]): + print(f"Warning: No corresponding attribute file for document {doc_sample} and attribute type {attr_type}. Skipping validation for this attribute.") + continue + attr_sample = attr_samples_dict[attr_type][doc_index] + except ValueError: + print(f"Warning: Document {doc_sample} not found in samples. Skipping validation for this document.") + continue + + vprint(f"\nValidating attribute file: {attr_sample}") + + attr_line_count = count_file_lines(attr_sample) + if attr_line_count == -1: + print("Failed to count lines in attribute file. Skipping further validation for this attribute.") + continue + + vprint(f"Attribute file has {attr_line_count} lines") + + if doc_line_count != attr_line_count: + print(f"Document: {doc_sample}") + print(f"Attribute: {attr_sample}") + print(f"ERROR: Line count mismatch! Document has {doc_line_count} lines, but attribute file has {attr_line_count} lines.") + return False + else: + vprint("Line count check PASSED: Document and attribute file have the same number of lines.") + + attr_expected_fields = {'id', 'attributes'} + is_valid, error_messages = validate_jsonl(attr_sample, attr_expected_fields) + if not is_valid: + print("Warning: possible attribute validation mismatch:") + for error in error_messages: + print(f" {error}") + else: + print("Attribute validation PASSED\n") + + if 'filter' in stream: + validate_filters_and_check_typos([attr_sample for attr_samples in attr_samples_dict.values() for attr_sample in attr_samples], stream['filter'], stream['attributes']) + + filter_execution_results = execute_filter_commands(attr_samples_dict, stream['attributes'], stream['filter'], num_lines=100) + vprint(filter_execution_results) + + return True + finally: + # Clean up: remove the temporary directory and its contents + if temp_dir and os.path.exists(temp_dir): + try: + shutil.rmtree(temp_dir) + # print(f"Temporary directory '{temp_dir}' has been removed.") + except Exception as e: + print(f"Error while removing temporary directory '{temp_dir}': {str(e)}") + diff --git a/tests/config/mixer-validator-jq.yaml b/tests/config/mixer-validator-jq.yaml new file mode 100644 index 00000000..5c6c4ec7 --- /dev/null +++ b/tests/config/mixer-validator-jq.yaml @@ -0,0 +1,51 @@ +streams: + - name: cc_tiny_subset + documents: + - s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/documents/**/*.jsonl.gz + + attributes: + - cc_tiny_subset_analysis_october + - bff_duplicate_paragraph_spans_new + + output: + path: s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/mixed-output-exclude-bff + max_size_in_bytes: 4294967296 + discard_fields: + - attributes + + filter: + syntax: jq + include: [] + exclude: + # Language filter (using both cld2 and fasttext) + - (.attributes.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 0.8 else false end) + - (.attributes.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 0.8 else false end) + # Document length filter + - (.attributes.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 100 else false end) + # NSFW content filter + - (.attributes.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.5 else false end) + # Gopher quality filter + - (.attributes.cc_tiny_subset_analysis_october__gopher_v2__word_count | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 50 else false end) + - (.attributes.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.1 else false end) + # Deduplication filter (BFF) + - (.attributes.bff_duplicate_paragraph_spans_new | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] >= 1.0 else false end) + # C4 quality filter + - (.attributes.cc_tiny_subset_analysis_october__c4_v2__line_bullets | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.5 else false end) + - (.attributes.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.3 else false end) + + span_replacement: + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__EMAIL_ADDRESS" + min_score: 0.5 + replacement: " |||EMAIL_ADDRESS||| " + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__PHONE_NUMBER" + min_score: 0.5 + replacement: " |||PHONE_NUMBER||| " + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__IP_ADDRESS" + min_score: 0.5 + replacement: " |||IP_ADDRESS||| " + +work_dir: + input: "/tmp/cc_tiny_subset_mix/input" + output: "/tmp/cc_tiny_subset_mix/output" + +processes: 16 \ No newline at end of file diff --git a/tests/config/mixer-validator-jsonpath.yaml b/tests/config/mixer-validator-jsonpath.yaml new file mode 100644 index 00000000..106eedaf --- /dev/null +++ b/tests/config/mixer-validator-jsonpath.yaml @@ -0,0 +1,50 @@ +streams: + - name: cc_tiny_subset + documents: + - s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/documents/**/*.jsonl.gz + + attributes: + - cc_tiny_subset_analysis_october + - bff_duplicate_paragraph_spans_new + + output: + path: s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/mixed-output + max_size_in_bytes: 4294967296 + discard_fields: + - attributes + + filter: + include: [] + exclude: + # Language filter (using both cld2 and fasttext) + - "$.attributes[?(@.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en && @.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en[0] && @.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en[0][2] < 0.8)]" + - "$.attributes[?(@.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en && @.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en[0] && @.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en[0][2] < 0.8)]" + # Document length filter + - "$.attributes[?(@.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document && @.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document[0] && @.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document[0][2] < 100)]" + # Deduplication filter + - "$@.attributes[?(@.bff_duplicate_paragraph_spans_new && @.bff_duplicate_paragraph_spans_new[0] && @.bff_duplicate_paragraph_spans_new[0][2] >= 1.0)]" + # NSFW content filter + - "$.attributes[?(@.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw && @.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw[0] && @.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw[0][2] > 0.5)]" + # Gopher quality filter (example, adjust threshold as needed) + - "$.attributes[?(@.cc_tiny_subset_analysis_october__gopher_v2__word_count && @.cc_tiny_subset_analysis_october__gopher_v2__word_count[0] && @.cc_tiny_subset_analysis_october__gopher_v2__word_count[0][2] < 50)]" + - "$.attributes[?(@.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio && @.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio[0] && @.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio[0][2] > 0.1)]" + # C4 quality filter (example, adjust threshold as needed) + - "$.attributes[?(@.cc_tiny_subset_analysis_october__c4_v2__line_bullets && @.cc_tiny_subset_analysis_october__c4_v2__line_bullets[0] && @.cc_tiny_subset_analysis_october__c4_v2__line_bullets[0][2] > 0.5)]" + - "$.attributes[?(@.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat && @.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat[0] && @.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat[0][2] > 0.3)]" + + span_replacement: + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__EMAIL_ADDRESS" + min_score: 0.5 + replacement: " |||EMAIL_ADDRESS||| " + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__PHONE_NUMBER" + min_score: 0.5 + replacement: " |||PHONE_NUMBER||| " + - span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__IP_ADDRESS" + min_score: 0.5 + replacement: " |||IP_ADDRESS||| " + +work_dir: + input: "/tmp/cc_tiny_subset_mix/input" + output: "/tmp/cc_tiny_subset_mix/output" + +processes: 16 \ No newline at end of file