Skip to content

Commit

Permalink
modify response options
Browse files Browse the repository at this point in the history
  • Loading branch information
yibeichan committed Sep 26, 2024
1 parent 2d85f9c commit 4b4a018
Showing 1 changed file with 92 additions and 9 deletions.
101 changes: 92 additions & 9 deletions reproschema/to_reproschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ def load_config(config_file: str) -> Dict[str, Any]:
with open(config_file, "r") as f:
return yaml.safe_load(f)


class ReproSchemaConverter:
def __init__(self, config: Dict[str, Any]):
self.config = config
Expand All @@ -33,18 +32,95 @@ def __init__(self, config: Dict[str, Any]):
self.branch_logic_pattern = re.compile(
r"\[([^\]]+)\]|\b(AND|OR)\b|([^><!=])=|sum\(([^)]+)\)"
)

def log_response_options(self, df: pd.DataFrame):
response_option_key = self.csv_to_reproschema_map['response_option']
for column in df.columns:
first_value = df[column].iloc[0]
if isinstance(first_value, str) and response_option_key in first_value:
print(f"\nResponse options for {column}:")
print(first_value)
elif pd.notna(first_value):
print(f"\nNon-string value in {column}:")
print(f"Type: {type(first_value)}, Value: {first_value}")

def process_response_options(self, response_option_str: str, item_name: str) -> tuple:
if pd.isna(response_option_str):
return [], ['xsd:string']

def clean_validity_administration_fields(self, df: pd.DataFrame) -> pd.DataFrame:
# Special handling for Validity and Administration fields
if item_name.endswith('_Validity') or item_name.endswith('_Administration'):
return self.process_validity_administration_options(item_name)

response_option = []
response_option_value_type = set()

choices = response_option_str.split('{-}')
for choice in choices:
choice = choice.strip()
if choice == "NULL=>''":
response_option.append({'name': {'en': 'NULL'}, 'value': None})
response_option_value_type.add('xsd:string')
elif '=>' in choice:
value, name = choice.split('=>')
value = value.strip("'").strip('"')
name = name.strip("'").strip('"')
if value.lower() == 'null':
value = None
response_option.append({'name': {'en': name}, 'value': value})
response_option_value_type.add('xsd:string')
else:
print(f"Warning: Unexpected choice format '{choice}' in {item_name} field")

if not response_option:
print(f"Warning: No valid choices found for {item_name}")
response_option.append({'name': {'en': 'No valid choices'}, 'value': None})

return response_option, list(response_option_value_type)

def process_validity_administration_options(self, item_name: str) -> tuple:
if item_name.endswith('_Validity'):
choices = [
{'name': {'en': 'Questionable'}, 'value': 'Questionable'},
{'name': {'en': 'Invalid'}, 'value': 'Invalid'},
{'name': {'en': 'Valid'}, 'value': 'Valid'}
]
elif item_name.endswith('_Administration'):
choices = [
{'name': {'en': 'None'}, 'value': 'None'},
{'name': {'en': 'Partial'}, 'value': 'Partial'},
{'name': {'en': 'All'}, 'value': 'All'}
]
else:
print(f"Warning: Unexpected field type for {item_name}")
choices = [{'name': {'en': 'No valid choices'}, 'value': None}]

return choices, ['xsd:string']

def preprocess_validity_administration_fields(self, df: pd.DataFrame) -> pd.DataFrame:
validity_admin_fields = [col for col in df.columns if col.endswith('_Validity') or col.endswith('_Administration')]
for field in validity_admin_fields:
df[field] = df[field].apply(lambda x: x.replace("''", "'") if isinstance(x, str) else x)
df[field] = "''Questionable'=>'Questionable'{-}'Invalid'=>'Invalid'{-}'Valid'=>'Valid'" if field.endswith('_Validity') else "''None'=>'None'{-}'Partial'=>'Partial'{-}'All'=>'All'"
return df

def load_csv(self, csv_file: str) -> pd.DataFrame:
df = pd.read_csv(csv_file)
df.columns = df.columns.str.strip().str.replace('"', '')
df = self.clean_validity_administration_fields(df)
return df
try:
df = pd.read_csv(csv_file)
df.columns = df.columns.str.strip().str.replace('"', '')
df = self.preprocess_validity_administration_fields(df)

# Log the first few rows of problematic columns
problem_columns = [col for col in df.columns if col.endswith('_Validity') or col.endswith('_Administration')]
for col in problem_columns:
print(f"\nFirst few entries of {col}:")
print(df[col].head())

self.log_response_options(df)
return df
except Exception as e:
print(f"Error loading CSV file: {str(e)}")
raise

def process_dataframe(self, df: pd.DataFrame) -> Dict[str, Dict[str, Any]]:
grouped = df.groupby(self.csv_to_reproschema_map["activity_name"])
activities = {}
Expand Down Expand Up @@ -127,15 +203,20 @@ def process_response_options(self, response_option_str: str, item_name: str) ->
elif '=>' in choice:
# Handle cases like ''Questionable'=>'Questionable'' or '0'=>'No'
value, name = choice.split('=>')
value = value.strip("'")
name = name.strip("'")
value = value.strip("'").strip('"')
name = name.strip("'").strip('"')
if value.lower() == 'null':
value = None
response_option.append({'name': {'en': name}, 'value': value})
response_option_value_type.add('xsd:string')
else:
print(f"Warning: Unexpected choice format '{choice}' in {item_name} field")

if not response_option:
print(f"Warning: No valid choices found for {item_name}")
# Add a default option to prevent empty choices
response_option.append({'name': {'en': 'No valid choices'}, 'value': None})

return response_option, list(response_option_value_type)

def process_additional_notes(
Expand Down Expand Up @@ -308,6 +389,8 @@ def convert(self, csv_file: str, output_path: str):
)
except Exception as e:
print(f"An error occurred during conversion: {str(e)}")
import traceback
traceback.print_exc()
raise


Expand Down

0 comments on commit 4b4a018

Please sign in to comment.