Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark connect attempt 2 #192

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/create_benchmark_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def save_benchmark_df(

builder = (
SparkSession.builder.appName("MyApp")
.remote("sc://localhost")
.config("spark.executor.memory", "20G")
.config("spark.driver.memory", "25G")
.config("spark.sql.shuffle.partitions", "2")
Expand Down
1 change: 1 addition & 0 deletions benchmarks/visualize_benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def get_benchmark_date(benchmark_path: str) -> str:
if __name__ == "__main__":
spark = (
SparkSession.builder.appName("MyApp") # type: ignore # noqa: PGH003
.remote("sc://localhost")
.config("spark.executor.memory", "10G")
.config("spark.driver.memory", "25G")
.config("spark.sql.shuffle.partitions", "2")
Expand Down
1,740 changes: 879 additions & 861 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ build-backend = "poetry.masonry.api"
###########################################################################

[tool.poetry.dependencies]
python = ">=3.7,<4.0"
python = ">=3.8,<4.0"


###########################################################################
Expand All @@ -37,14 +37,14 @@ optional = true
optional = true

[tool.poetry.group.development.dependencies]
pyspark = ">2"
pyspark = "^3.5.0"
semver = "^3"

[tool.poetry.group.testing.dependencies]
pytest = "^7"
chispa = "0.9.4"
pytest-describe = "^2"
pyspark = ">2"
pyspark = "^3.5.0"
semver = "^3"

[tool.poetry.group.linting.dependencies]
Expand Down
37 changes: 19 additions & 18 deletions quinn/extensions/column_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,55 @@
from pyspark.sql.functions import lit, trim, when


def isFalsy(self: Column) -> Column:
def isFalsy(col: Column) -> Column:
"""Returns a Column indicating whether all values in the Column are False or NULL (**falsy**).

Each element in the resulting column is True if all the elements in the
Column are either NULL or False, or False otherwise. This is accomplished by
performing a bitwise or of the ``isNull`` condition and a literal False value and
then wrapping the result in a **when** statement.

:param self: Column object
:param col: Column object
:returns: Column object
:rtype: Column
"""
return when(self.isNull() | (self == lit(False)), True).otherwise(False)
return when(col.isNull() | (col == lit(False)), True).otherwise(False)


def isTruthy(self: Column) -> Column:
def isTruthy(col: Column) -> Column:
"""Calculates a boolean expression that is the opposite of isFalsy for the given ``Column`` self.

:param Column self: The ``Column`` to calculate the opposite of isFalsy for.
:param Column col: The ``Column`` to calculate the opposite of isFalsy for.
:returns: A ``Column`` with the results of the calculation.
:rtype: Column
"""
return ~(self.isFalsy())
return ~(col.isFalsy())


def isFalse(self: Column) -> Column:
def isFalse(col: Column) -> Column:
"""Function checks if the column is equal to False and returns the column.

:param self: Column
:param col: Column
:return: Column
:rtype: Column
"""
return self == lit(False)
return col == lit(False)


def isTrue(self: Column) -> Column:
def isTrue(col: Column) -> Column:
"""Function takes a column of type Column as an argument and returns a column of type Column.

It evaluates whether each element in the column argument is equal to True, and
if so will return True, otherwise False.

:param self: Column object
:param col: Column object
:returns: Column object
:rtype: Column
"""
return self == lit(True)
return col == lit(True)


def isNullOrBlank(self: Column) -> Column:
def isNullOrBlank(col: Column) -> Column:
r"""Returns a Boolean value which expresses whether a given column is ``null`` or contains only blank characters.

:param \*\*self: The :class:`Column` to check.
Expand All @@ -63,17 +63,17 @@ def isNullOrBlank(self: Column) -> Column:
blank characters, or ``False`` otherwise.
:rtype: Column
"""
return (self.isNull()) | (trim(self) == "")
return (col.isNull()) | (trim(col) == "")


def isNotIn(self: Column, _list: list[Any]) -> Column:
def isNotIn(col: Column, _list: list[Any]) -> Column:
"""To see if a value is not in a list of values.

:param self: Column object
:param col: Column object
:_list: list[Any]
:rtype: Column
"""
return ~(self.isin(_list))
return ~(col.isin(_list))


def nullBetween(self: Column, lower: Column, upper: Column) -> Column:
Expand All @@ -96,7 +96,8 @@ def nullBetween(self: Column, lower: Column, upper: Column) -> Column:
)


Column.isFalsy = isFalsy
# Column.isFalsy = isFalsy
Column.isFalsy = getattr(Column, "isFalsy", isFalsy)
Column.isTruthy = isTruthy
Column.isFalse = isFalse
Column.isTrue = isTrue
Expand Down
2 changes: 1 addition & 1 deletion quinn/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def fix_nullability(field: StructField, result_dict: dict) -> None:
fix_nullability(field, result_dict)

if not hasattr(SparkSession, "getActiveSession"): # spark 2.4
spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
else:
spark = SparkSession.getActiveSession()
spark = spark if spark is not None else SparkSession.builder.getOrCreate()
Expand Down
14 changes: 7 additions & 7 deletions tests/extensions/test_column_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def test_is_falsy():
("expected", BooleanType(), True),
],
)
actual_df = source_df.withColumn("is_has_stuff_falsy", F.col("has_stuff").isFalsy())
actual_df = source_df.withColumn("is_has_stuff_falsy", isFalsy(F.col("has_stuff")))
chispa.assert_column_equality(actual_df, "is_has_stuff_falsy", "expected")


Expand All @@ -27,7 +27,7 @@ def test_is_truthy():
[("has_stuff", BooleanType(), True), ("expected", BooleanType(), True)],
)
actual_df = source_df.withColumn(
"is_has_stuff_truthy", F.col("has_stuff").isTruthy()
"is_has_stuff_truthy", isTruthy(F.col("has_stuff"))
)
chispa.assert_column_equality(actual_df, "is_has_stuff_truthy", "expected")

Expand All @@ -38,7 +38,7 @@ def test_is_false():
[(True, False), (False, True), (None, None)],
[("has_stuff", BooleanType(), True), ("expected", BooleanType(), True)],
)
actual_df = source_df.withColumn("is_has_stuff_false", F.col("has_stuff").isFalse())
actual_df = source_df.withColumn("is_has_stuff_false", isFalse(F.col("has_stuff")))
chispa.assert_column_equality(actual_df, "is_has_stuff_false", "expected")


Expand All @@ -48,7 +48,7 @@ def test_is_true():
[(True, True), (False, False), (None, None)],
[("has_stuff", BooleanType(), True), ("expected", BooleanType(), True)],
)
actual_df = source_df.withColumn("is_stuff_true", F.col("has_stuff").isTrue())
actual_df = source_df.withColumn("is_stuff_true", isTrue(F.col("has_stuff")))
chispa.assert_column_equality(actual_df, "is_stuff_true", "expected")


Expand All @@ -67,7 +67,7 @@ def test_is_null_or_blank():
],
)
actual_df = source_df.withColumn(
"is_blah_null_or_blank", F.col("blah").isNullOrBlank()
"is_blah_null_or_blank", isNullOrBlank(F.col("blah"))
)
chispa.assert_column_equality(actual_df, "is_blah_null_or_blank", "expected")

Expand All @@ -87,7 +87,7 @@ def test_is_not_in():
)
bobs_hobbies = ["dancing", "snowboarding"]
actual_df = source_df.withColumn(
"is_not_bobs_hobby", F.col("fun_thing").isNotIn(bobs_hobbies)
"is_not_bobs_hobby", isNotIn(F.col("fun_thing"), bobs_hobbies)
)
chispa.assert_column_equality(actual_df, "is_not_bobs_hobby", "expected")

Expand All @@ -113,6 +113,6 @@ def test_null_between():
],
)
actual_df = source_df.withColumn(
"is_between", F.col("age").nullBetween(F.col("lower_age"), F.col("upper_age"))
"is_between", nullBetween(F.col("age"), (F.col("lower_age"), F.col("upper_age")))
)
chispa.assert_column_equality(actual_df, "is_between", "expected")
2 changes: 1 addition & 1 deletion tests/spark.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("chispa").getOrCreate()
spark = SparkSession.builder.remote("sc://localhost").appName("chispa").getOrCreate()
Loading