Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate the Spark-Connect tests with the existing test suite & CI #256

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
33eefc9
Update the CONTRIBUTING.md file to add details regarding pre-commit &…
nijanthanvijayakumar Jul 16, 2024
b917388
Remove the test_spark_connect as it is not required separately
nijanthanvijayakumar Jul 17, 2024
d56c1c5
Fix the failing tests for the dataframe_helpers
nijanthanvijayakumar Jul 19, 2024
d12ab07
Update the CI YAML file for running the Spark-Connect tests
nijanthanvijayakumar Aug 4, 2024
55bc170
Update the failing functions & test-cases to throw exceptions for Spa…
nijanthanvijayakumar Aug 5, 2024
598bc34
Update the test cases to skip the sort_columns execution if SC < 3.5.2
nijanthanvijayakumar Aug 5, 2024
b0c35a6
Update the test case wrapper to check SparkConnect compatibility
nijanthanvijayakumar Aug 5, 2024
936409c
Wrap the describe_sort_columns with check sc compatibility decorator
nijanthanvijayakumar Aug 6, 2024
bcd9e9c
Use the spark connect compatibility decorator only on the child-tests
nijanthanvijayakumar Aug 6, 2024
86ed8a0
Remove the feature/* branch from the workflow trigger
nijanthanvijayakumar Aug 6, 2024
0c6422a
Include the doc-string for the Spark-Connect wrapper function
nijanthanvijayakumar Aug 6, 2024
4bac2c9
Fix the linting errors by introducing custom exception
nijanthanvijayakumar Aug 6, 2024
c0b300e
Update the Spark-Connect test cases to use the decorator function
nijanthanvijayakumar Aug 9, 2024
93e35ff
Remove the unnecessary doc-string comment from new Exception
nijanthanvijayakumar Aug 9, 2024
c35299b
Update the gitignore file to exclude Spark-Connected binaries downloaded
nijanthanvijayakumar Aug 9, 2024
8bf53b5
Exclude the Spark archive as well from the Git files
nijanthanvijayakumar Aug 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ jobs:
run: |
if [[ "${SPARK_VERSION}" > "3.4" ]]; then
sh scripts/run_spark_connect_server.sh
# The tests should be called from here.
make test
else
echo "Skipping Spark-Connect tests for Spark version <= 3.4"
echo "Skipping Spark-Connect tests for Spark version ${SPARK_VERSION}, which is <= 3.4"
fi

check-license-headers:
Expand Down
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ site
# Benchmarking
*.crc
*.parquet
_SUCCESS
_SUCCESS

# Spark-Connect related files/binaries
spark-*-bin-hadoop3/**
spark-*-bin-hadoop3.tgz
98 changes: 98 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Scan through our [existing issues](https://github.com/MrPowers/quinn/issues) to

You can find a list of [good first issues](https://github.com/MrPowers/quinn/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) which can help you better understand code base of the project.

### Auto-assigning issues

We have a workflow that automatically assigns issues to users who comment 'take' on an issue. This is configured in the `.github/workflows/assign-on-comment.yml` file. When a user comments `take` on the issue, a GitHub Action will be run to assign the issue to the user if it's not already assigned.

## Contributing

### Fork the repository
Expand Down Expand Up @@ -49,6 +53,17 @@ make install_deps

To run spark tests you need to have properly configured Java. Apache Spark currently supports mainly only Java 8 (1.8). You can find an instruction on how to set up Java [here](https://www.java.com/en/download/help/download_options.html). When you are running spark tests you should have `JAVA_HOME` variable in your environment which points to the installation of Java 8.

### Pre-commit installation and execution

We use pre-commit hooks to ensure code quality. The configuration for pre-commit hooks is in the `.pre-commit-config.yaml` file. To install pre-commit, run:
```shell
poetry shell
poetry run pre-commit install
```
To run pre-commit hooks manually, use:
```shell
pre-commit run --all-files
```

### Running Tests

Expand All @@ -57,6 +72,89 @@ You can run test as following:
```shell
make test
```

#### GitHub Actions local setup using 'act'

You can run GitHub Actions locally using the `act` tool. The configuration for GitHub Actions is in
the `.github/workflows/ci.yml` file. To install `act`, follow the
instructions [here](https://github.com/nektos/act#installation). To run a specific job, use:

```shell
act -j <job-name>
```

For example, to run the `test` job, use:

```shell
act -j test
```

If you need help with `act`, use:

```shell
act --help
```

For MacBooks with M1 processors, you might have to add the `--container-architecture` tag:

```shell
act -j <job-name> --container-architecture linux/arm64
```

#### Running Spark-Connect tests locally

To run the Spark-Connect tests locally, follow the below steps. Please note, this only works on Mac/UNIX-based systems.

1. **Set up the required environment variables:** Following variables need to be setup, so that the shell script that
is used to install the Spark-Connect binary & start the server picks the version.

The version can either be `3.5.1` or `3.4.3`, as those are the ones used in our CI.

```shell
export SPARK_VERSION=3.5.1
export SPARK_CONNECT_MODE_ENABLED=1
```

2. **Check if the required environment variables are set:** Run the below command to check if the required environment
variables are set.

```shell
echo $SPARK_VERSION
echo $SPARK_CONNECT_MODE_ENABLED
```

3. **Install required system packages:** Run the below command to install wget.

For Mac users:
```shell
brew install wget
```

For Ubuntu users:
```shell
sudo apt-get install wget
```

4. **Execute the shell script:** Run the below command to execute the shell script that installs the Spark-Connect &
starts the server.

```shell
sh scripts/run_spark_connect_server.sh
```

5. **Run the tests:** Run the below command to execute the tests using Spark-Connect.

```shell
make test
```

6. **Cleanups:** After running the tests, you can stop the Spark-Connect server and unset the environment variables.

```shell
unset SPARK_VERSION
unset SPARK_CONNECT_MODE_ENABLED
```

### Code style

This project follows the [PySpark style guide](https://github.com/MrPowers/spark-style-guide/blob/main/PYSPARK_STYLE_GUIDE.md). All public functions and methods should be documented in `README.md` and also should have docstrings in `sphinx format`:
Expand Down
2 changes: 1 addition & 1 deletion quinn/dataframe_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def column_to_list(df: DataFrame, col_name: str) -> list[Any]:
if sys.modules["pyspark"].__version__ < "3.3.0":
return [row[0] for row in df.select(col_name).collect()]

spark_session = df.sparkSession.getActiveSession()
spark_session = df.sparkSession
if spark_session is None:
return [row[0] for row in df.select(col_name).collect()]

Expand Down
18 changes: 18 additions & 0 deletions quinn/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from pyspark.sql.functions import udf


import os
import sys
import uuid
from typing import Any

Expand All @@ -32,6 +34,19 @@
)


class UnsupportedSparkConnectFunctionError(Exception):
"""Raise this when a function that is not supported by Spark-Connect < v3.5.2 is called."""

def __init__(self) -> None:
"""Initialize the UnsupportedSparkConnectFunction exception.

:returns: None
:rtype: None
"""
self.message = "This function is not supported on Spark-Connect < 3.5.2"
super().__init__(self.message)


def single_space(col: Column) -> Column:
"""Function takes a column and replaces all the multiple white spaces with a single space.

Expand Down Expand Up @@ -196,6 +211,9 @@ def array_choice(col: Column, seed: int | None = None) -> Column:
:return: random element from the given column
:rtype: Column
"""
if sys.modules["pyspark"].__version__ < "3.5.2" and os.getenv("SPARK_CONNECT_MODE_ENABLED"):
raise UnsupportedSparkConnectFunctionError

index = (F.rand(seed) * F.size(col)).cast("int")
return col[index]

Expand Down
5 changes: 5 additions & 0 deletions quinn/transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@

from __future__ import annotations

import os
import re
import sys
from collections.abc import Callable

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F # noqa: N812
from pyspark.sql.types import ArrayType, MapType, StructField, StructType

from quinn.functions import UnsupportedSparkConnectFunctionError
from quinn.schema_helpers import complex_fields


Expand Down Expand Up @@ -113,6 +116,8 @@ def sort_columns( # noqa: C901,PLR0915
:return: A DataFrame with the columns sorted in the chosen order
:rtype: pyspark.sql.DataFrame
"""
if sys.modules["pyspark"].__version__ < "3.5.2" and os.getenv("SPARK_CONNECT_MODE_ENABLED"):
raise UnsupportedSparkConnectFunctionError

def sort_nested_cols(
schema: StructType,
Expand Down
38 changes: 38 additions & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import inspect
import os
import pytest
from .spark import spark
from functools import wraps


def check_spark_connect_compatibility(func):
"""
Decorator to check Spark-Connect compatibility for a given function.

This decorator will check the Spark version and the environment variable
`SPARK_CONNECT_MODE_ENABLED`. If the Spark version is less than 3.5.2 and
`SPARK_CONNECT_MODE_ENABLED` is set, it will raise an exception when the
decorated function is called. Otherwise, it will execute the function normally.

:param func: The function to be decorated.
:type func: function
:return: The wrapped function with Spark-Connect compatibility check.
:rtype: function
"""

@wraps(func)
def wrapper(*args, **kwargs):
spark_version = spark.version
if spark_version < "3.5.2" and os.getenv("SPARK_CONNECT_MODE_ENABLED"):
# Except an exception to be raised calling the decorated function. In this context, the test case.
with pytest.raises(Exception) as excinfo:
func(*args, **kwargs)

# Assert that the exception message matches the expected format
assert str(excinfo.value) == f"This function is not supported on Spark-Connect < 3.5.2"

else:
# If the conditions are not met, call the function normally.
return func(*args, **kwargs)

return wrapper
19 changes: 13 additions & 6 deletions tests/test_functions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

import pytest

import pyspark.sql.functions as F
Expand All @@ -19,6 +21,7 @@

import datetime
import uuid
from tests import check_spark_connect_compatibility


def test_single_space():
Expand Down Expand Up @@ -171,8 +174,8 @@ def it_errors_out_if_with_invalid_week_start_date():
"week_start_date", quinn.week_start_date(F.col("some_date"), "hello")
)
assert (
excinfo.value.args[0]
== "The day you entered 'hello' is not valid. Here are the valid days: [Mon,Tue,Wed,Thu,Fri,Sat,Sun]"
excinfo.value.args[0]
== "The day you entered 'hello' is not valid. Here are the valid days: [Mon,Tue,Wed,Thu,Fri,Sat,Sun]"
)


Expand Down Expand Up @@ -228,8 +231,8 @@ def it_errors_out_if_with_invalid_week_end_date():
"week_start_date", quinn.week_end_date(F.col("some_date"), "Friday")
)
assert (
excinfo.value.args[0]
== "The day you entered 'Friday' is not valid. Here are the valid days: [Mon,Tue,Wed,Thu,Fri,Sat,Sun]"
excinfo.value.args[0]
== "The day you entered 'Friday' is not valid. Here are the valid days: [Mon,Tue,Wed,Thu,Fri,Sat,Sun]"
)


Expand Down Expand Up @@ -280,16 +283,19 @@ def it_works_with_integer_values():


# TODO: Figure out how to make this test deterministic locally & on CI
@check_spark_connect_compatibility
def test_array_choice():
df = quinn.create_df(spark,
# Create the DataFrame so that it can be passed to the if & else blocks
df = quinn.create_df(
spark,
[(["a", "b", "c"], "c"), (["a", "b", "c", "d"], "a"), (["x"], "x"), ([None], None)],
[("letters", ArrayType(StringType(), True), True), ("expected", StringType(), True)],
)

actual_df = df.withColumn("random_letter", quinn.array_choice(F.col("letters"), 42))
# chispa.assert_column_equality(actual_df, "random_letter", "expected")



def test_business_days_between():
df = quinn.create_df(
spark,
Expand Down Expand Up @@ -361,6 +367,7 @@ def test_with_extra_string():
)
chispa.assert_column_equality(actual_df, "uuid5_of_s1", "expected")


def test_is_falsy():
source_df = quinn.create_df(
spark,
Expand Down
20 changes: 0 additions & 20 deletions tests/test_spark_connect.py

This file was deleted.

Loading
Loading