diff --git a/quinn/dataframe_helpers.py b/quinn/dataframe_helpers.py index 687457ff..81d6b55e 100644 --- a/quinn/dataframe_helpers.py +++ b/quinn/dataframe_helpers.py @@ -27,7 +27,7 @@ def column_to_list(df: DataFrame, col_name: str) -> list[Any]: # sparksession from df is not available in older versions of pyspark if sys.modules["pyspark"].__version__ < "3.3.0": - return df.select(col_name).rdd.flatMap(lambda x: x).collect() + return [row[0] for row in df.select(col_name).collect()] spark_config = df.sparkSession.sparkContext.getConf().getAll() @@ -40,7 +40,7 @@ def column_to_list(df: DataFrame, col_name: str) -> list[Any]: if pyarrow_valid and pandas_valid: return df.select(col_name).toPandas()[col_name].tolist() - return df.select(col_name).rdd.flatMap(lambda x: x).collect() + return [row[0] for row in df.select(col_name).collect()] def two_columns_to_dictionary( diff --git a/quinn/transformations.py b/quinn/transformations.py index 9d041c26..f377e863 100644 --- a/quinn/transformations.py +++ b/quinn/transformations.py @@ -252,7 +252,7 @@ def fix_nullability(field: StructField, result_dict: dict) -> None: spark = SparkSession.getActiveSession() spark = spark if spark is not None else SparkSession.builder.getOrCreate() - return spark.createDataFrame(output.rdd, output.schema) + return output def flatten_struct(df: DataFrame, col_name: str, separator: str = ":") -> DataFrame: