From 7e9941a1a3c257927f8e4f1bb3350f40be7bfebe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D1=82=D1=8B=D0=BD=D0=BE=D0=B2=20=D0=9C?= =?UTF-8?q?=D0=B0=D0=BA=D1=81=D0=B8=D0=BC=20=D0=A1=D0=B5=D1=80=D0=B3=D0=B5?= =?UTF-8?q?=D0=B5=D0=B2=D0=B8=D1=87?= Date: Tue, 18 Feb 2025 11:19:50 +0000 Subject: [PATCH] [DOP-23968] Add .get_exlude_packages() to SparkS3 and Kafka --- README.rst | 2 + docs/changelog/next_release/341.feature.rst | 18 +++++++ .../db_connection/kafka/connection.rst | 2 +- .../spark_s3/connection.rst | 2 +- .../db_connection/kafka/connection.py | 36 +++++++++++++ .../file_df_connection/spark_s3/connection.py | 53 ++++++++++++++++--- tests/fixtures/spark.py | 9 ++-- .../test_greenplum_reader_unit.py | 2 +- 8 files changed, 109 insertions(+), 15 deletions(-) create mode 100644 docs/changelog/next_release/341.feature.rst diff --git a/README.rst b/README.rst index 4cd67e756..50219a2bb 100644 --- a/README.rst +++ b/README.rst @@ -552,9 +552,11 @@ Read files directly from S3 path, convert them to dataframe, transform it and th # Initialize new SparkSession with Hadoop AWS libraries and Postgres driver loaded maven_packages = SparkS3.get_packages(spark_version="3.5.4") + Postgres.get_packages() + exclude_packages = SparkS3.get_exclude_packages() spark = ( SparkSession.builder.appName("spark_app_onetl_demo") .config("spark.jars.packages", ",".join(maven_packages)) + .config("spark.jars.excludes", ",".join(exclude_packages)) .getOrCreate() ) diff --git a/docs/changelog/next_release/341.feature.rst b/docs/changelog/next_release/341.feature.rst new file mode 100644 index 000000000..0a3ca2625 --- /dev/null +++ b/docs/changelog/next_release/341.feature.rst @@ -0,0 +1,18 @@ +Add ``SparkS3.get_exclude_packages()`` and ``Kafka.get_exclude_packages()`` methods. +Using them allows to skip downloading dependencies not required by this specific connector, or which are already a part of Spark/PySpark: + +.. code:: python + + from onetl.connection import SparkS3, Kafka + + maven_packages = [ + *SparkS3.get_packages(spark_version="3.5.4"), + *Kafka.get_packages(spark_version="3.5.4"), + ] + exclude_packages = SparkS3.get_exclude_packages() + Kafka.get_exclude_packages() + spark = ( + SparkSession.builder.appName("spark_app_onetl_demo") + .config("spark.jars.packages", ",".join(maven_packages)) + .config("spark.jars.excludes", ",".join(exclude_packages)) + .getOrCreate() + ) diff --git a/docs/connection/db_connection/kafka/connection.rst b/docs/connection/db_connection/kafka/connection.rst index 314cc758c..3f0edb985 100644 --- a/docs/connection/db_connection/kafka/connection.rst +++ b/docs/connection/db_connection/kafka/connection.rst @@ -6,4 +6,4 @@ Kafka Connection .. currentmodule:: onetl.connection.db_connection.kafka.connection .. autoclass:: Kafka - :members: get_packages, check, close + :members: get_packages, get_packages, check, close diff --git a/docs/connection/file_df_connection/spark_s3/connection.rst b/docs/connection/file_df_connection/spark_s3/connection.rst index cfa1aafb8..7b5de5213 100644 --- a/docs/connection/file_df_connection/spark_s3/connection.rst +++ b/docs/connection/file_df_connection/spark_s3/connection.rst @@ -6,4 +6,4 @@ Spark S3 Connection .. currentmodule:: onetl.connection.file_df_connection.spark_s3.connection .. autoclass:: SparkS3 - :members: check, close, get_packages + :members: check, close, get_packages, get_packages diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 2d3e76637..db72b6a41 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -122,9 +122,11 @@ class Kafka(DBConnection): # Create Spark session with Kafka connector loaded maven_packages = Kafka.get_packages(spark_version="3.2.4") + exclude_packages = Kafka.get_exclude_packages() spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) + .config("spark.jars.excludes", ",".join(exclude_packages)) .getOrCreate() ) @@ -436,6 +438,40 @@ def get_packages( f"org.apache.spark:spark-sql-kafka-0-10_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}", ] + @slot + @classmethod + def get_exclude_packages(cls) -> list[str]: + """ + Get package names to be excluded by Spark. |support_hooks| + + .. versionadded:: 0.13.0 + + Examples + -------- + + .. code:: python + + from onetl.connection import Kafka + + Kafka.get_exclude_packages() + + """ + + return [ + # already a part of Spark bundle + "org.apache.hadoop:hadoop-client-api", + "org.apache.hadoop:hadoop-client-runtime", + "com.fasterxml.jackson.core:jackson-annotations", + "com.fasterxml.jackson.core:jackson-core", + "com.fasterxml.jackson.core:jackson-databind", + "com.google.code.findbugs:jsr305", + "commons-codec:commons-codec", + "commons-logging:commons-logging", + "org.lz4:lz4-java", + "org.slf4j:slf4j-api", + "org.xerial.snappy:snappy-java", + ] + def __enter__(self): return self diff --git a/onetl/connection/file_df_connection/spark_s3/connection.py b/onetl/connection/file_df_connection/spark_s3/connection.py index 2b267a605..6b2df8890 100644 --- a/onetl/connection/file_df_connection/spark_s3/connection.py +++ b/onetl/connection/file_df_connection/spark_s3/connection.py @@ -134,13 +134,8 @@ class SparkS3(SparkFileDFConnection): # Create Spark session with Hadoop AWS libraries loaded maven_packages = SparkS3.get_packages(spark_version="3.5.4") - # Some dependencies are not used, but downloading takes a lot of time. Skipping them. - excluded_packages = [ - "com.google.cloud.bigdataoss:gcs-connector", - "org.apache.hadoop:hadoop-aliyun", - "org.apache.hadoop:hadoop-azure-datalake", - "org.apache.hadoop:hadoop-azure", - ] + # Some packages are not used, but downloading takes a lot of time. Skipping them. + excluded_packages = SparkS3.get_exclude_packages() spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) @@ -254,6 +249,50 @@ def get_packages( # https://mvnrepository.com/artifact/org.apache.spark/spark-hadoop-cloud return [f"org.apache.spark:spark-hadoop-cloud_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}"] + @slot + @classmethod + def get_exclude_packages(cls) -> list[str]: + """ + Get package names to be excluded by Spark. |support_hooks| + + .. versionadded:: 0.13.0 + + Examples + -------- + + .. code:: python + + from onetl.connection import SparkS3 + + SparkS3.get_exclude_packages() + + """ + + return [ + # heavy and not used + "com.google.cloud.bigdataoss:gcs-connector", + "org.apache.hadoop:hadoop-aliyun", + "org.apache.hadoop:hadoop-azure-datalake", + "org.apache.hadoop:hadoop-azure", + "org.apache.hadoop:hadoop-cos", + "org.apache.hadoop:hadoop-openstack", + "org.apache.hadoop:hadoop-huaweicloud", + # already a part of Spark bundle + "org.apache.hadoop:hadoop-client-api", + "org.apache.hadoop:hadoop-client-runtime", + "com.fasterxml.jackson.core:jackson-annotations", + "com.fasterxml.jackson.core:jackson-core", + "com.fasterxml.jackson.core:jackson-databind", + "com.google.code.findbugs:jsr305", + "commons-codec:commons-codec", + "commons-logging:commons-logging", + "joda-time:joda-time", + "org.apache.httpcomponents:httpclient", + "org.apache.httpcomponents:httpcore", + "org.slf4j:slf4j-api", + "org.xerial.snappy:snappy-java", + ] + @slot def path_from_string(self, path: os.PathLike | str) -> RemotePath: return RemotePath(os.fspath(path)) diff --git a/tests/fixtures/spark.py b/tests/fixtures/spark.py index 760b0e4f9..17508d103 100644 --- a/tests/fixtures/spark.py +++ b/tests/fixtures/spark.py @@ -121,12 +121,11 @@ def maven_packages(request): @pytest.fixture(scope="session") def excluded_packages(): - # These packages are a part of org.apache.spark:spark-hadoop-cloud, but not used in tests + from onetl.connection import Kafka, SparkS3 + return [ - "com.google.cloud.bigdataoss:gcs-connector", - "org.apache.hadoop:hadoop-aliyun", - "org.apache.hadoop:hadoop-azure-datalake", - "org.apache.hadoop:hadoop-azure", + *SparkS3.get_exclude_packages(), + *Kafka.get_exclude_packages(), ] diff --git a/tests/tests_unit/test_db/test_db_reader_unit/test_greenplum_reader_unit.py b/tests/tests_unit/test_db/test_db_reader_unit/test_greenplum_reader_unit.py index 7ad9a1a60..9e062156f 100644 --- a/tests/tests_unit/test_db/test_db_reader_unit/test_greenplum_reader_unit.py +++ b/tests/tests_unit/test_db/test_db_reader_unit/test_greenplum_reader_unit.py @@ -46,7 +46,7 @@ def test_greenplum_reader_wrong_table_name(spark_mock): ) -def test_postgres_reader_hint_unsupported(spark_mock): +def test_greenplum_reader_hint_unsupported(spark_mock): greenplum = Greenplum(host="some_host", user="user", database="database", password="passwd", spark=spark_mock) with pytest.raises(