diff --git a/reproschema/to_reproschema.py b/reproschema/to_reproschema.py index fe45aa2..c1e913a 100755 --- a/reproschema/to_reproschema.py +++ b/reproschema/to_reproschema.py @@ -1,5 +1,7 @@ import argparse +import os import re +import shutil from pathlib import Path from typing import Any, Dict, List @@ -9,14 +11,12 @@ from .context_url import CONTEXTFILE_URL from .jsonldutils import get_context_version -from .mappings import ( - ADDITIONAL_NOTES_LIST, - CSV_TO_REPROSCHEMA_MAP, - INPUT_TYPE_MAP, - VALUE_TYPE_MAP, -) +from .mappings import CSV_TO_REPROSCHEMA_MAP, INPUT_TYPE_MAP, VALUE_TYPE_MAP, ADDITIONAL_NOTES_LIST from .models import Activity, Item, Protocol, write_obj_jsonld +import warnings +from bs4 import BeautifulSoup, MarkupResemblesLocatorWarning +warnings.filterwarnings("ignore", category=MarkupResemblesLocatorWarning) def load_config(config_file: str) -> Dict[str, Any]: with open(config_file, "r") as f: @@ -25,251 +25,97 @@ def load_config(config_file: str) -> Dict[str, Any]: class ReproSchemaConverter: def __init__(self, config: Dict[str, Any]): self.config = config - self.csv_to_reproschema_map = CSV_TO_REPROSCHEMA_MAP - self.value_type_map = VALUE_TYPE_MAP - self.input_type_map = INPUT_TYPE_MAP - self.additional_notes_columns = ADDITIONAL_NOTES_LIST - self.branch_logic_pattern = re.compile( - r"\[([^\]]+)\]|\b(AND|OR)\b|([^> pd.DataFrame: + df = pd.read_csv(csv_file) + df.columns = df.columns.str.strip().str.replace('"', '') + return self.preprocess_fields(df) + + def preprocess_fields(self, df: pd.DataFrame) -> pd.DataFrame: + special_fields = [col for col in df.columns if col.endswith(('_Validity', '_Administration', '_Informant'))] + for field in special_fields: + df[field] = df[field].apply(lambda x: x.replace('>', '>').replace('\n', '').replace('\r', '') if isinstance(x, str) else x) + return df def process_response_options(self, response_option_str: str, item_name: str) -> tuple: if pd.isna(response_option_str): return [], ['xsd:string'] - # Special handling for Validity and Administration fields - if item_name.endswith('_Validity') or item_name.endswith('_Administration'): - return self.process_validity_administration_options(item_name) - - response_option = [] - response_option_value_type = set() - - choices = response_option_str.split('{-}') - for choice in choices: - choice = choice.strip() - if choice == "NULL=>''": - response_option.append({'name': {'en': 'NULL'}, 'value': None}) - response_option_value_type.add('xsd:string') - elif '=>' in choice: - value, name = choice.split('=>') - value = value.strip("'").strip('"') - name = name.strip("'").strip('"') - if value.lower() == 'null': - value = None - response_option.append({'name': {'en': name}, 'value': value}) - response_option_value_type.add('xsd:string') - else: - print(f"Warning: Unexpected choice format '{choice}' in {item_name} field") - - if not response_option: - print(f"Warning: No valid choices found for {item_name}") - response_option.append({'name': {'en': 'No valid choices'}, 'value': None}) - - return response_option, list(response_option_value_type) - - def process_validity_administration_options(self, item_name: str) -> tuple: - if item_name.endswith('_Validity'): - choices = [ - {'name': {'en': 'Questionable'}, 'value': 'Questionable'}, - {'name': {'en': 'Invalid'}, 'value': 'Invalid'}, - {'name': {'en': 'Valid'}, 'value': 'Valid'} - ] - elif item_name.endswith('_Administration'): - choices = [ - {'name': {'en': 'None'}, 'value': 'None'}, - {'name': {'en': 'Partial'}, 'value': 'Partial'}, - {'name': {'en': 'All'}, 'value': 'All'} - ] - else: - print(f"Warning: Unexpected field type for {item_name}") - choices = [{'name': {'en': 'No valid choices'}, 'value': None}] - - return choices, ['xsd:string'] - - def preprocess_validity_administration_fields(self, df: pd.DataFrame) -> pd.DataFrame: - validity_admin_fields = [col for col in df.columns if col.endswith('_Validity') or col.endswith('_Administration')] - for field in validity_admin_fields: - df[field] = "''Questionable'=>'Questionable'{-}'Invalid'=>'Invalid'{-}'Valid'=>'Valid'" if field.endswith('_Validity') else "''None'=>'None'{-}'Partial'=>'Partial'{-}'All'=>'All'" - return df + choices = [] + value_types = set() - def load_csv(self, csv_file: str) -> pd.DataFrame: - try: - df = pd.read_csv(csv_file) - df.columns = df.columns.str.strip().str.replace('"', '') - df = self.preprocess_validity_administration_fields(df) - - # Log the first few rows of problematic columns - problem_columns = [col for col in df.columns if col.endswith('_Validity') or col.endswith('_Administration')] - for col in problem_columns: - print(f"\nFirst few entries of {col}:") - print(df[col].head()) - - self.log_response_options(df) - return df - except Exception as e: - print(f"Error loading CSV file: {str(e)}") - raise + if item_name.endswith(('_Validity', '_Administration', '_Informant')): + pattern = r"''([^']+)'(?:=>|=>)'([^']+)''" + matches = re.findall(pattern, response_option_str) + choices = [{'name': {'en': name}, 'value': value} for value, name in matches] + else: + for choice in response_option_str.split('{-}'): + choice = choice.strip() + if '=>' in choice: + value, name = map(lambda x: x.strip("'").strip('"'), choice.split('=>')) + choices.append({'name': {'en': name}, 'value': None if value.lower() == 'null' else value}) + elif choice != "NULL=>''": + print(f"Warning: Unexpected choice format '{choice}' in {item_name} field") + + if not choices: + print(f"Warning: No valid choices found for {item_name}") + choices.append({'name': {'en': 'No valid choices'}, 'value': None}) - def process_dataframe(self, df: pd.DataFrame) -> Dict[str, Dict[str, Any]]: - grouped = df.groupby(self.csv_to_reproschema_map["activity_name"]) - activities = {} - for activity_name, group in grouped: - items = [ - self.process_item(item) for item in group.to_dict("records") - ] - activities[activity_name] = { - "items": items, - "order": [f"items/{item['id']}" for item in items], - "compute": self.generate_compute_section(items), - } - return activities + value_types.add('xsd:string') + return choices, list(value_types) + def clean_html(self, raw_html: str) -> str: + if pd.isna(raw_html): + return "" + text = str(raw_html) + if "<" in text and ">" in text: + return BeautifulSoup(text, "html.parser").get_text() + return text + def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]: - input_type = self.input_type_map.get( - item[self.csv_to_reproschema_map["inputType"]], "text" - ) item_data = { "category": "reproschema:Item", - "id": item[self.csv_to_reproschema_map["item_name"]], - "prefLabel": { - "en": item[self.csv_to_reproschema_map["item_name"]] - }, - "question": { - "en": self.clean_html( - item[self.csv_to_reproschema_map["question"]] - ) - }, - "ui": {"inputType": input_type}, + "id": item[CSV_TO_REPROSCHEMA_MAP["item_name"]], + "prefLabel": {"en": item[CSV_TO_REPROSCHEMA_MAP["item_name"]]}, + "question": {"en": self.clean_html(item[CSV_TO_REPROSCHEMA_MAP["question"]])}, + "ui": {"inputType": INPUT_TYPE_MAP.get(item[CSV_TO_REPROSCHEMA_MAP["inputType"]], "text")}, "responseOptions": { - "valueType": self.determine_value_type(item), - "multipleChoice": item[ - self.csv_to_reproschema_map["inputType"] - ] - == "Multi-select", + "valueType": [VALUE_TYPE_MAP.get(str(item.get(CSV_TO_REPROSCHEMA_MAP.get("validation", ""), "")).strip(), "xsd:string")], + "multipleChoice": item[CSV_TO_REPROSCHEMA_MAP["inputType"]] == "Multi-select", }, } - if self.csv_to_reproschema_map["response_option"] in item: - ( - item_data["responseOptions"]["choices"], - item_data["responseOptions"]["valueType"], - ) = self.process_response_options( - item[self.csv_to_reproschema_map["response_option"]], - item[self.csv_to_reproschema_map["item_name"]], + if CSV_TO_REPROSCHEMA_MAP["response_option"] in item: + item_data["responseOptions"]["choices"], item_data["responseOptions"]["valueType"] = self.process_response_options( + item[CSV_TO_REPROSCHEMA_MAP["response_option"]], + item[CSV_TO_REPROSCHEMA_MAP["item_name"]], ) - item_data["additionalNotesObj"] = self.process_additional_notes(item) - - return item_data - - def determine_value_type(self, item: Dict[str, Any]) -> List[str]: - validation_type = item.get( - self.csv_to_reproschema_map.get("validation", ""), "" - ) - - # Ensure validation_type is a string before stripping - if pd.isna(validation_type): - validation_type = "" - else: - validation_type = str(validation_type).strip() - - return [self.value_type_map.get(validation_type, "xsd:string")] - - def process_response_options(self, response_option_str: str, item_name: str) -> tuple: - if pd.isna(response_option_str): - return [], ['xsd:string'] - - response_option = [] - response_option_value_type = set() - - choices = response_option_str.split('{-}') - for choice in choices: - choice = choice.strip() - if choice == "NULL=>''": - # Handle NULL case - response_option.append({'name': {'en': 'NULL'}, 'value': None}) - response_option_value_type.add('xsd:string') - elif '=>' in choice: - # Handle cases like ''Questionable'=>'Questionable'' or '0'=>'No' - value, name = choice.split('=>') - value = value.strip("'").strip('"') - name = name.strip("'").strip('"') - if value.lower() == 'null': - value = None - response_option.append({'name': {'en': name}, 'value': value}) - response_option_value_type.add('xsd:string') - else: - print(f"Warning: Unexpected choice format '{choice}' in {item_name} field") - - if not response_option: - print(f"Warning: No valid choices found for {item_name}") - # Add a default option to prevent empty choices - response_option.append({'name': {'en': 'No valid choices'}, 'value': None}) - - return response_option, list(response_option_value_type) - - def process_additional_notes( - self, item: Dict[str, Any] - ) -> List[Dict[str, str]]: - return [ + item_data["additionalNotesObj"] = [ {"source": "redcap", "column": column, "value": item[column]} - for column in self.additional_notes_columns + for column in ADDITIONAL_NOTES_LIST if column in item and item[column] ] - def clean_html(self, raw_html: str) -> str: - if pd.isna(raw_html): - return "" - if "<" in str(raw_html) and ">" in str(raw_html): - return BeautifulSoup(str(raw_html), "html.parser").get_text() - return str(raw_html) - - def generate_compute_section( - self, items: List[Dict[str, Any]] - ) -> List[Dict[str, str]]: - compute_items = [] - for item in items: - item_id = item["id"].lower() - if "_score" in item_id or item_id.endswith("_raw"): - compute_items.append( + return item_data + + def process_dataframe(self, df: pd.DataFrame) -> Dict[str, Dict[str, Any]]: + activities = {} + for activity_name, group in df.groupby(CSV_TO_REPROSCHEMA_MAP["activity_name"]): + items = [self.process_item(item) for item in group.to_dict("records")] + activities[activity_name] = { + "items": items, + "order": [f"items/{item['id']}" for item in items], + "compute": [ {"variableName": item["id"], "jsExpression": ""} - ) - return compute_items - - def branch_logic(self, condition_str): - if not condition_str: - return "true" - - def replace_func(match): - if match.group(1): # [variable] -> variable - return match.group(1) - elif match.group(2): # AND|OR -> && or || - return " && " if match.group(2).lower() == "and" else " || " - elif match.group(3): # single = -> === - return match.group(3) + "===" - elif match.group(4): # sum() -> reduce() - return f"[{match.group(4)}].reduce((a, b) => a + b, 0)" - - return self.branch_logic_pattern.sub(replace_func, condition_str) - - def create_activity_schema( - self, - activity_name: str, - activity_data: Dict[str, Any], - output_path: Path, - redcap_version: str, - ): + for item in items + if "_score" in item["id"].lower() or item["id"].lower().endswith("_raw") + ], + } + return activities + + def create_activity_schema(self, activity_name: str, activity_data: Dict[str, Any], output_path: Path, redcap_version: str): json_ld = { "category": "reproschema:Activity", "id": f"{activity_name}_schema", @@ -283,7 +129,11 @@ def create_activity_schema( "variableName": item["id"], "isAbout": f"items/{item['id']}", "valueRequired": item.get("valueRequired", False), - "isVis": not self.should_hide_item(item), + "isVis": not ( + "_score" in item["id"].lower() + or item["id"].lower().endswith("_raw") + or "@HIDDEN" in item.get("annotation", "").lower() + ), } for item in activity_data["items"] ], @@ -297,43 +147,24 @@ def create_activity_schema( act = Activity(**json_ld) path = output_path / "activities" / activity_name path.mkdir(parents=True, exist_ok=True) - file_path = path / f"{activity_name}_schema" - write_obj_jsonld(act, file_path, contextfile_url=CONTEXTFILE_URL) + write_obj_jsonld(act, path / f"{activity_name}_schema", contextfile_url=CONTEXTFILE_URL) + + items_path = path / "items" + items_path.mkdir(parents=True, exist_ok=True) for item in activity_data["items"]: - it = Item(**item) - file_path_item = path / "items" / item["id"] - file_path_item.parent.mkdir( - parents=True, exist_ok=True - ) # Create parent directories - write_obj_jsonld( - it, file_path_item, contextfile_url=CONTEXTFILE_URL - ) + item_path = items_path / item["id"] + item_path.parent.mkdir(parents=True, exist_ok=True) + write_obj_jsonld(Item(**item), item_path, contextfile_url=CONTEXTFILE_URL) print(f"{activity_name} Instrument schema created") - def should_hide_item(self, item: Dict[str, Any]) -> bool: - item_id = item["id"].lower() - return ( - "_score" in item_id - or item_id.endswith("_raw") - or "@HIDDEN" in item.get("annotation", "").lower() - ) - - def create_protocol_schema( - self, - protocol_name: str, - protocol_data: Dict[str, Any], - activities: List[str], - output_path: Path, - ): + def create_protocol_schema(self, protocol_name: str, protocol_data: Dict[str, Any], activities: List[str], output_path: Path): protocol_schema = { "category": "reproschema:Protocol", "id": f"{protocol_name}_schema", "prefLabel": {"en": protocol_data["protocol_display_name"]}, - "description": { - "en": protocol_data.get("protocol_description", "") - }, + "description": {"en": protocol_data.get("protocol_description", "")}, "schemaVersion": get_context_version(CONTEXTFILE_URL), "version": protocol_data["redcap_version"], "ui": { @@ -341,71 +172,72 @@ def create_protocol_schema( { "isAbout": f"../activities/{activity}/{activity}_schema", "variableName": f"{activity}_schema", - "prefLabel": { - "en": activity.replace("_", " ").title() - }, + "prefLabel": {"en": activity.replace("_", " ").title()}, "isVis": True, } for activity in activities ], - "order": [ - f"../activities/{activity}/{activity}_schema" - for activity in activities - ], + "order": [f"../activities/{activity}/{activity}_schema" for activity in activities], "shuffle": False, }, } - prot = Protocol(**protocol_schema) protocol_dir = output_path / protocol_name protocol_dir.mkdir(parents=True, exist_ok=True) - file_path = protocol_dir / f"{protocol_name}_schema" - write_obj_jsonld(prot, file_path, contextfile_url=CONTEXTFILE_URL) - print(f"Protocol schema created in {file_path}") + write_obj_jsonld(Protocol(**protocol_schema), protocol_dir / f"{protocol_name}_schema", contextfile_url=CONTEXTFILE_URL) + print(f"Protocol schema created in {protocol_dir}") + + def clean_output_directories(self, output_path: Path): + """Remove only the folders in the output directory.""" + if output_path.exists(): + for item in output_path.iterdir(): + if item.is_dir(): + shutil.rmtree(item) + print(f"Removed directory: {item}") + print(f"Cleaned folders in output directory: {output_path}") + else: + print(f"Output directory does not exist, will be created: {output_path}") + + def remove_ds_store(self, directory: Path): + """Remove all .DS_Store files in the given directory and its subdirectories.""" + for root, dirs, files in os.walk(directory): + for file in files: + if file == '.DS_Store': + file_path = Path(root) / file + file_path.unlink() + print(f"Removed .DS_Store file: {file_path}") def convert(self, csv_file: str, output_path: str): try: df = self.load_csv(csv_file) activities = self.process_dataframe(df) - abs_output_path = Path(output_path) / self.config[ - "protocol_name" - ].replace(" ", "_") + abs_output_path = Path(output_path) / self.config["protocol_name"].replace(" ", "_") + + # Clean only the folders in the output directory before conversion + self.clean_output_directories(abs_output_path) + abs_output_path.mkdir(parents=True, exist_ok=True) for activity_name, activity_data in activities.items(): - self.create_activity_schema( - activity_name, - activity_data, - abs_output_path, - self.config["redcap_version"], - ) - - self.create_protocol_schema( - self.config["protocol_name"], - self.config, - list(activities.keys()), - abs_output_path, - ) + self.create_activity_schema(activity_name, activity_data, abs_output_path, self.config["redcap_version"]) + + self.create_protocol_schema(self.config["protocol_name"], self.config, list(activities.keys()), abs_output_path) + + # Remove .DS_Store files after conversion + self.remove_ds_store(abs_output_path) + print("Conversion completed and .DS_Store files removed.") except Exception as e: print(f"An error occurred during conversion: {str(e)}") import traceback traceback.print_exc() raise - def main(): - parser = argparse.ArgumentParser( - description="Convert a CSV file to ReproSchema format." - ) + parser = argparse.ArgumentParser(description="Convert a CSV file to ReproSchema format.") parser.add_argument("csv_file", help="Path to the input CSV file.") - parser.add_argument( - "config_file", help="Path to the YAML configuration file." - ) - parser.add_argument( - "output_path", - help="Path to the directory where the output schemas will be saved.", - ) + parser.add_argument("config_file", help="Path to the YAML configuration file.") + parser.add_argument("output_path", help="Path to the directory where the output schemas will be saved.") args = parser.parse_args() @@ -413,6 +245,5 @@ def main(): converter = ReproSchemaConverter(config) converter.convert(args.csv_file, args.output_path) - if __name__ == "__main__": - main() + main() \ No newline at end of file