diff --git a/quinn/transformations.py b/quinn/transformations.py index 31f59212..7c5c7803 100644 --- a/quinn/transformations.py +++ b/quinn/transformations.py @@ -1,6 +1,6 @@ +from __future__ import annotations import re import pyspark.sql.functions as F # noqa: N812 -from __future__ import annotations from collections.abc import Callable from pyspark.sql import DataFrame from pyspark.sql.types import ArrayType, MapType, StructField, StructType @@ -100,7 +100,7 @@ def sort_columns( :return: A DataFrame with the columns sorted in the chosen order :rtype: pyspark.sql.DataFrame """ - + def sort_nested_cols(schema, is_reversed, base_field="") -> list[str]: # recursively check nested fields and sort them # https://stackoverflow.com/questions/57821538/how-to-sort-columns-of-nested-structs-alphabetically-in-pyspark @@ -282,6 +282,7 @@ def flatten_map(df: DataFrame, col_name: str, separator: str = ":") -> DataFrame [F.col(f"`{col}`") for col in df.columns if col != col_name] + key_cols, ) + def flatten_dataframe( df: DataFrame, separator: str = ":", @@ -331,6 +332,7 @@ def flatten_dataframe( >>> flattened_df_with_hyphen = flatten_dataframe(df, replace_char="-") >>> flattened_df_with_hyphen.show() """ + def sanitize_column_name(name: str, rc: str = "_") -> str: """Sanitizes column names by replacing special characters with the specified character. @@ -353,7 +355,9 @@ def explode_array(df: DataFrame, col_name: str) -> DataFrame: :return: The DataFrame with the exploded ArrayType column. :rtype: DataFrame """ - return df.select("*", F.explode_outer(F.col(f"`{col_name}`")).alias(col_name)).drop( + return df.select( + "*", F.explode_outer(F.col(f"`{col_name}`")).alias(col_name) + ).drop( col_name, ) @@ -380,4 +384,4 @@ def explode_array(df: DataFrame, col_name: str) -> DataFrame: ] df = df.toDF(*sanitized_columns) # noqa: PD901 - return df \ No newline at end of file + return df