Skip to content

Commit

Permalink
[DOP-23968] Add .get_exlude_packages() to SparkS3 and Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Feb 18, 2025
1 parent 60aea9e commit 28f8309
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 13 deletions.
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,11 @@ Read files directly from S3 path, convert them to dataframe, transform it and th
# Initialize new SparkSession with Hadoop AWS libraries and Postgres driver loaded
maven_packages = SparkS3.get_packages(spark_version="3.5.4") + Postgres.get_packages()
exclude_packages = SparkS3.get_exclude_packages()
spark = (
SparkSession.builder.appName("spark_app_onetl_demo")
.config("spark.jars.packages", ",".join(maven_packages))
.config("spark.jars.excludes", ",".join(exclude_packages))
.getOrCreate()
)
Expand Down
18 changes: 18 additions & 0 deletions docs/changelog/next_release/340.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Add ``SparkS3.get_exclude_packages()`` and ``Kafka.get_exclude_packages()`` methods.
Using them allows to skip downloading dependencies not required by this specific connector, or which are already a part of Spark/PySpark:

.. code:: python
from onetl.connection import SparkS3, Kafka
maven_packages = [
*SparkS3.get_packages(spark_version="3.5.4"),
*Kafka.get_packages(spark_version="3.5.4"),
]
exclude_packages = SparkS3.get_exclude_packages() + Kafka.get_exclude_packages()
spark = (
SparkSession.builder.appName("spark_app_onetl_demo")
.config("spark.jars.packages", ",".join(maven_packages))
.config("spark.jars.excludes", ",".join(exclude_packages))
.getOrCreate()
)
36 changes: 36 additions & 0 deletions onetl/connection/db_connection/kafka/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,11 @@ class Kafka(DBConnection):
# Create Spark session with Kafka connector loaded
maven_packages = Kafka.get_packages(spark_version="3.2.4")
exclude_packages = Kafka.get_exclude_packages()
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
.config("spark.jars.excludes", ",".join(exclude_packages))
.getOrCreate()
)
Expand Down Expand Up @@ -436,6 +438,40 @@ def get_packages(
f"org.apache.spark:spark-sql-kafka-0-10_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}",
]

@slot
@classmethod
def get_exclude_packages(cls) -> list[str]:
"""
Get package names to be excluded by Spark. |support_hooks|
.. versionadded:: 0.13.0
Examples
--------
.. code:: python
from onetl.connection import Kafka
Kafka.get_exclude_packages()
"""

return [
# already a part of Spark bundle
"org.apache.hadoop:hadoop-client-api",
"org.apache.hadoop:hadoop-client-runtime",
"com.fasterxml.jackson.core:jackson-annotations",
"com.fasterxml.jackson.core:jackson-core",
"com.fasterxml.jackson.core:jackson-databind",
"com.google.code.findbugs:jsr305",
"commons-codec:commons-codec",
"commons-logging:commons-logging",
"org.lz4:lz4-java",
"org.slf4j:slf4j-api",
"org.xerial.snappy:snappy-java",
]

def __enter__(self):
return self

Expand Down
53 changes: 46 additions & 7 deletions onetl/connection/file_df_connection/spark_s3/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,8 @@ class SparkS3(SparkFileDFConnection):
# Create Spark session with Hadoop AWS libraries loaded
maven_packages = SparkS3.get_packages(spark_version="3.5.4")
# Some dependencies are not used, but downloading takes a lot of time. Skipping them.
excluded_packages = [
"com.google.cloud.bigdataoss:gcs-connector",
"org.apache.hadoop:hadoop-aliyun",
"org.apache.hadoop:hadoop-azure-datalake",
"org.apache.hadoop:hadoop-azure",
]
# Some packages are not used, but downloading takes a lot of time. Skipping them.
excluded_packages = SparkS3.get_exclude_packages()
spark = (
SparkSession.builder.appName("spark-app-name")
.config("spark.jars.packages", ",".join(maven_packages))
Expand Down Expand Up @@ -254,6 +249,50 @@ def get_packages(
# https://mvnrepository.com/artifact/org.apache.spark/spark-hadoop-cloud
return [f"org.apache.spark:spark-hadoop-cloud_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}"]

@slot
@classmethod
def get_exclude_packages(cls) -> list[str]:
"""
Get package names to be excluded by Spark. |support_hooks|
.. versionadded:: 0.13.0
Examples
--------
.. code:: python
from onetl.connection import SparkS3
SparkS3.get_exclude_packages()
"""

return [
# heavy and not used
"com.google.cloud.bigdataoss:gcs-connector",
"org.apache.hadoop:hadoop-aliyun",
"org.apache.hadoop:hadoop-azure-datalake",
"org.apache.hadoop:hadoop-azure",
"org.apache.hadoop:hadoop-cos",
"org.apache.hadoop:hadoop-openstack",
"org.apache.hadoop:hadoop-huaweicloud",
# already a part of Spark bundle
"org.apache.hadoop:hadoop-client-api",
"org.apache.hadoop:hadoop-client-runtime",
"com.fasterxml.jackson.core:jackson-annotations",
"com.fasterxml.jackson.core:jackson-core",
"com.fasterxml.jackson.core:jackson-databind",
"com.google.code.findbugs:jsr305",
"commons-codec:commons-codec",
"commons-logging:commons-logging",
"joda-time:joda-time",
"org.apache.httpcomponents:httpclient",
"org.apache.httpcomponents:httpcore",
"org.slf4j:slf4j-api",
"org.xerial.snappy:snappy-java",
]

@slot
def path_from_string(self, path: os.PathLike | str) -> RemotePath:
return RemotePath(os.fspath(path))
Expand Down
9 changes: 4 additions & 5 deletions tests/fixtures/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,11 @@ def maven_packages(request):

@pytest.fixture(scope="session")
def excluded_packages():
# These packages are a part of org.apache.spark:spark-hadoop-cloud, but not used in tests
from onetl.connection import Kafka, SparkS3

return [
"com.google.cloud.bigdataoss:gcs-connector",
"org.apache.hadoop:hadoop-aliyun",
"org.apache.hadoop:hadoop-azure-datalake",
"org.apache.hadoop:hadoop-azure",
*SparkS3.get_exclude_packages(),
*Kafka.get_exclude_packages(),
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_greenplum_reader_wrong_table_name(spark_mock):
)


def test_postgres_reader_hint_unsupported(spark_mock):
def test_greenplum_reader_hint_unsupported(spark_mock):
greenplum = Greenplum(host="some_host", user="user", database="database", password="passwd", spark=spark_mock)

with pytest.raises(
Expand Down

0 comments on commit 28f8309

Please sign in to comment.