diff --git a/reproschema/to_reproschema.py b/reproschema/to_reproschema.py index bc10a52..fe45aa2 100755 --- a/reproschema/to_reproschema.py +++ b/reproschema/to_reproschema.py @@ -22,7 +22,6 @@ def load_config(config_file: str) -> Dict[str, Any]: with open(config_file, "r") as f: return yaml.safe_load(f) - class ReproSchemaConverter: def __init__(self, config: Dict[str, Any]): self.config = config @@ -33,18 +32,95 @@ def __init__(self, config: Dict[str, Any]): self.branch_logic_pattern = re.compile( r"\[([^\]]+)\]|\b(AND|OR)\b|([^> tuple: + if pd.isna(response_option_str): + return [], ['xsd:string'] - def clean_validity_administration_fields(self, df: pd.DataFrame) -> pd.DataFrame: + # Special handling for Validity and Administration fields + if item_name.endswith('_Validity') or item_name.endswith('_Administration'): + return self.process_validity_administration_options(item_name) + + response_option = [] + response_option_value_type = set() + + choices = response_option_str.split('{-}') + for choice in choices: + choice = choice.strip() + if choice == "NULL=>''": + response_option.append({'name': {'en': 'NULL'}, 'value': None}) + response_option_value_type.add('xsd:string') + elif '=>' in choice: + value, name = choice.split('=>') + value = value.strip("'").strip('"') + name = name.strip("'").strip('"') + if value.lower() == 'null': + value = None + response_option.append({'name': {'en': name}, 'value': value}) + response_option_value_type.add('xsd:string') + else: + print(f"Warning: Unexpected choice format '{choice}' in {item_name} field") + + if not response_option: + print(f"Warning: No valid choices found for {item_name}") + response_option.append({'name': {'en': 'No valid choices'}, 'value': None}) + + return response_option, list(response_option_value_type) + + def process_validity_administration_options(self, item_name: str) -> tuple: + if item_name.endswith('_Validity'): + choices = [ + {'name': {'en': 'Questionable'}, 'value': 'Questionable'}, + {'name': {'en': 'Invalid'}, 'value': 'Invalid'}, + {'name': {'en': 'Valid'}, 'value': 'Valid'} + ] + elif item_name.endswith('_Administration'): + choices = [ + {'name': {'en': 'None'}, 'value': 'None'}, + {'name': {'en': 'Partial'}, 'value': 'Partial'}, + {'name': {'en': 'All'}, 'value': 'All'} + ] + else: + print(f"Warning: Unexpected field type for {item_name}") + choices = [{'name': {'en': 'No valid choices'}, 'value': None}] + + return choices, ['xsd:string'] + + def preprocess_validity_administration_fields(self, df: pd.DataFrame) -> pd.DataFrame: validity_admin_fields = [col for col in df.columns if col.endswith('_Validity') or col.endswith('_Administration')] for field in validity_admin_fields: - df[field] = df[field].apply(lambda x: x.replace("''", "'") if isinstance(x, str) else x) + df[field] = "''Questionable'=>'Questionable'{-}'Invalid'=>'Invalid'{-}'Valid'=>'Valid'" if field.endswith('_Validity') else "''None'=>'None'{-}'Partial'=>'Partial'{-}'All'=>'All'" return df def load_csv(self, csv_file: str) -> pd.DataFrame: - df = pd.read_csv(csv_file) - df.columns = df.columns.str.strip().str.replace('"', '') - df = self.clean_validity_administration_fields(df) - return df + try: + df = pd.read_csv(csv_file) + df.columns = df.columns.str.strip().str.replace('"', '') + df = self.preprocess_validity_administration_fields(df) + + # Log the first few rows of problematic columns + problem_columns = [col for col in df.columns if col.endswith('_Validity') or col.endswith('_Administration')] + for col in problem_columns: + print(f"\nFirst few entries of {col}:") + print(df[col].head()) + + self.log_response_options(df) + return df + except Exception as e: + print(f"Error loading CSV file: {str(e)}") + raise + def process_dataframe(self, df: pd.DataFrame) -> Dict[str, Dict[str, Any]]: grouped = df.groupby(self.csv_to_reproschema_map["activity_name"]) activities = {} @@ -127,8 +203,8 @@ def process_response_options(self, response_option_str: str, item_name: str) -> elif '=>' in choice: # Handle cases like ''Questionable'=>'Questionable'' or '0'=>'No' value, name = choice.split('=>') - value = value.strip("'") - name = name.strip("'") + value = value.strip("'").strip('"') + name = name.strip("'").strip('"') if value.lower() == 'null': value = None response_option.append({'name': {'en': name}, 'value': value}) @@ -136,6 +212,11 @@ def process_response_options(self, response_option_str: str, item_name: str) -> else: print(f"Warning: Unexpected choice format '{choice}' in {item_name} field") + if not response_option: + print(f"Warning: No valid choices found for {item_name}") + # Add a default option to prevent empty choices + response_option.append({'name': {'en': 'No valid choices'}, 'value': None}) + return response_option, list(response_option_value_type) def process_additional_notes( @@ -308,6 +389,8 @@ def convert(self, csv_file: str, output_path: str): ) except Exception as e: print(f"An error occurred during conversion: {str(e)}") + import traceback + traceback.print_exc() raise