diff --git a/README.rst b/README.rst index 4cd67e75..50219a2b 100644 --- a/README.rst +++ b/README.rst @@ -552,9 +552,11 @@ Read files directly from S3 path, convert them to dataframe, transform it and th # Initialize new SparkSession with Hadoop AWS libraries and Postgres driver loaded maven_packages = SparkS3.get_packages(spark_version="3.5.4") + Postgres.get_packages() + exclude_packages = SparkS3.get_exclude_packages() spark = ( SparkSession.builder.appName("spark_app_onetl_demo") .config("spark.jars.packages", ",".join(maven_packages)) + .config("spark.jars.excludes", ",".join(exclude_packages)) .getOrCreate() ) diff --git a/docs/changelog/next_release/341.feature.rst b/docs/changelog/next_release/341.feature.rst new file mode 100644 index 00000000..0a3ca262 --- /dev/null +++ b/docs/changelog/next_release/341.feature.rst @@ -0,0 +1,18 @@ +Add ``SparkS3.get_exclude_packages()`` and ``Kafka.get_exclude_packages()`` methods. +Using them allows to skip downloading dependencies not required by this specific connector, or which are already a part of Spark/PySpark: + +.. code:: python + + from onetl.connection import SparkS3, Kafka + + maven_packages = [ + *SparkS3.get_packages(spark_version="3.5.4"), + *Kafka.get_packages(spark_version="3.5.4"), + ] + exclude_packages = SparkS3.get_exclude_packages() + Kafka.get_exclude_packages() + spark = ( + SparkSession.builder.appName("spark_app_onetl_demo") + .config("spark.jars.packages", ",".join(maven_packages)) + .config("spark.jars.excludes", ",".join(exclude_packages)) + .getOrCreate() + ) diff --git a/docs/connection/db_connection/kafka/connection.rst b/docs/connection/db_connection/kafka/connection.rst index 314cc758..3f0edb98 100644 --- a/docs/connection/db_connection/kafka/connection.rst +++ b/docs/connection/db_connection/kafka/connection.rst @@ -6,4 +6,4 @@ Kafka Connection .. currentmodule:: onetl.connection.db_connection.kafka.connection .. autoclass:: Kafka - :members: get_packages, check, close + :members: get_packages, get_packages, check, close diff --git a/docs/connection/file_df_connection/spark_s3/connection.rst b/docs/connection/file_df_connection/spark_s3/connection.rst index cfa1aafb..7b5de521 100644 --- a/docs/connection/file_df_connection/spark_s3/connection.rst +++ b/docs/connection/file_df_connection/spark_s3/connection.rst @@ -6,4 +6,4 @@ Spark S3 Connection .. currentmodule:: onetl.connection.file_df_connection.spark_s3.connection .. autoclass:: SparkS3 - :members: check, close, get_packages + :members: check, close, get_packages, get_packages diff --git a/onetl/connection/db_connection/kafka/connection.py b/onetl/connection/db_connection/kafka/connection.py index 2d3e7663..db72b6a4 100644 --- a/onetl/connection/db_connection/kafka/connection.py +++ b/onetl/connection/db_connection/kafka/connection.py @@ -122,9 +122,11 @@ class Kafka(DBConnection): # Create Spark session with Kafka connector loaded maven_packages = Kafka.get_packages(spark_version="3.2.4") + exclude_packages = Kafka.get_exclude_packages() spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) + .config("spark.jars.excludes", ",".join(exclude_packages)) .getOrCreate() ) @@ -436,6 +438,40 @@ def get_packages( f"org.apache.spark:spark-sql-kafka-0-10_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}", ] + @slot + @classmethod + def get_exclude_packages(cls) -> list[str]: + """ + Get package names to be excluded by Spark. |support_hooks| + + .. versionadded:: 0.13.0 + + Examples + -------- + + .. code:: python + + from onetl.connection import Kafka + + Kafka.get_exclude_packages() + + """ + + return [ + # already a part of Spark bundle + "org.apache.hadoop:hadoop-client-api", + "org.apache.hadoop:hadoop-client-runtime", + "com.fasterxml.jackson.core:jackson-annotations", + "com.fasterxml.jackson.core:jackson-core", + "com.fasterxml.jackson.core:jackson-databind", + "com.google.code.findbugs:jsr305", + "commons-codec:commons-codec", + "commons-logging:commons-logging", + "org.lz4:lz4-java", + "org.slf4j:slf4j-api", + "org.xerial.snappy:snappy-java", + ] + def __enter__(self): return self diff --git a/onetl/connection/file_df_connection/spark_s3/connection.py b/onetl/connection/file_df_connection/spark_s3/connection.py index 2b267a60..6b2df889 100644 --- a/onetl/connection/file_df_connection/spark_s3/connection.py +++ b/onetl/connection/file_df_connection/spark_s3/connection.py @@ -134,13 +134,8 @@ class SparkS3(SparkFileDFConnection): # Create Spark session with Hadoop AWS libraries loaded maven_packages = SparkS3.get_packages(spark_version="3.5.4") - # Some dependencies are not used, but downloading takes a lot of time. Skipping them. - excluded_packages = [ - "com.google.cloud.bigdataoss:gcs-connector", - "org.apache.hadoop:hadoop-aliyun", - "org.apache.hadoop:hadoop-azure-datalake", - "org.apache.hadoop:hadoop-azure", - ] + # Some packages are not used, but downloading takes a lot of time. Skipping them. + excluded_packages = SparkS3.get_exclude_packages() spark = ( SparkSession.builder.appName("spark-app-name") .config("spark.jars.packages", ",".join(maven_packages)) @@ -254,6 +249,50 @@ def get_packages( # https://mvnrepository.com/artifact/org.apache.spark/spark-hadoop-cloud return [f"org.apache.spark:spark-hadoop-cloud_{scala_ver.format('{0}.{1}')}:{spark_ver.format('{0}.{1}.{2}')}"] + @slot + @classmethod + def get_exclude_packages(cls) -> list[str]: + """ + Get package names to be excluded by Spark. |support_hooks| + + .. versionadded:: 0.13.0 + + Examples + -------- + + .. code:: python + + from onetl.connection import SparkS3 + + SparkS3.get_exclude_packages() + + """ + + return [ + # heavy and not used + "com.google.cloud.bigdataoss:gcs-connector", + "org.apache.hadoop:hadoop-aliyun", + "org.apache.hadoop:hadoop-azure-datalake", + "org.apache.hadoop:hadoop-azure", + "org.apache.hadoop:hadoop-cos", + "org.apache.hadoop:hadoop-openstack", + "org.apache.hadoop:hadoop-huaweicloud", + # already a part of Spark bundle + "org.apache.hadoop:hadoop-client-api", + "org.apache.hadoop:hadoop-client-runtime", + "com.fasterxml.jackson.core:jackson-annotations", + "com.fasterxml.jackson.core:jackson-core", + "com.fasterxml.jackson.core:jackson-databind", + "com.google.code.findbugs:jsr305", + "commons-codec:commons-codec", + "commons-logging:commons-logging", + "joda-time:joda-time", + "org.apache.httpcomponents:httpclient", + "org.apache.httpcomponents:httpcore", + "org.slf4j:slf4j-api", + "org.xerial.snappy:snappy-java", + ] + @slot def path_from_string(self, path: os.PathLike | str) -> RemotePath: return RemotePath(os.fspath(path)) diff --git a/tests/fixtures/spark.py b/tests/fixtures/spark.py index 760b0e4f..17508d10 100644 --- a/tests/fixtures/spark.py +++ b/tests/fixtures/spark.py @@ -121,12 +121,11 @@ def maven_packages(request): @pytest.fixture(scope="session") def excluded_packages(): - # These packages are a part of org.apache.spark:spark-hadoop-cloud, but not used in tests + from onetl.connection import Kafka, SparkS3 + return [ - "com.google.cloud.bigdataoss:gcs-connector", - "org.apache.hadoop:hadoop-aliyun", - "org.apache.hadoop:hadoop-azure-datalake", - "org.apache.hadoop:hadoop-azure", + *SparkS3.get_exclude_packages(), + *Kafka.get_exclude_packages(), ] diff --git a/tests/tests_unit/test_db/test_db_reader_unit/test_greenplum_reader_unit.py b/tests/tests_unit/test_db/test_db_reader_unit/test_greenplum_reader_unit.py index 7ad9a1a6..9e062156 100644 --- a/tests/tests_unit/test_db/test_db_reader_unit/test_greenplum_reader_unit.py +++ b/tests/tests_unit/test_db/test_db_reader_unit/test_greenplum_reader_unit.py @@ -46,7 +46,7 @@ def test_greenplum_reader_wrong_table_name(spark_mock): ) -def test_postgres_reader_hint_unsupported(spark_mock): +def test_greenplum_reader_hint_unsupported(spark_mock): greenplum = Greenplum(host="some_host", user="user", database="database", password="passwd", spark=spark_mock) with pytest.raises(