From c72f381d00397fdce8b81039ac15ce2c6dd52407 Mon Sep 17 00:00:00 2001 From: Danny Meijer Date: Tue, 26 Nov 2024 11:46:41 +0100 Subject: [PATCH] Fix/delta merge builder instance check for connect + util fix (#130) ## Description Additonal fixes for #101 and #102 ## Related Issue #101, #102 ## Motivation and Context ## How Has This Been Tested? ## Screenshots (if appropriate): ## Types of changes - [x] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Checklist: - [x] My code follows the code style of this project. - [ ] My change requires a change to the documentation. - [ ] I have updated the documentation accordingly. - [x] I have read the **CONTRIBUTING** document. - [ ] I have added tests to cover my changes. - [x] All new and existing tests passed. --------- Co-authored-by: Danny Meijer <10511979+dannymeijer@users.noreply.github.com> --- src/koheesio/spark/utils/connect.py | 5 ++++- src/koheesio/spark/writers/delta/batch.py | 7 +++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/koheesio/spark/utils/connect.py b/src/koheesio/spark/utils/connect.py index 9cf7f028..d5728902 100644 --- a/src/koheesio/spark/utils/connect.py +++ b/src/koheesio/spark/utils/connect.py @@ -14,6 +14,9 @@ def is_remote_session(spark: Optional[SparkSession] = None) -> bool: result = False if (_spark := spark or get_active_session()) and check_if_pyspark_connect_is_supported(): - result = True if _spark.conf.get("spark.remote", None) else False # type: ignore + # result = True if _spark.conf.get("spark.remote", None) else False # type: ignore + from pyspark.sql.connect.session import SparkSession as ConnectSparkSession + + result = isinstance(_spark, ConnectSparkSession) return result diff --git a/src/koheesio/spark/writers/delta/batch.py b/src/koheesio/spark/writers/delta/batch.py index 5f66df99..2c124cd5 100644 --- a/src/koheesio/spark/writers/delta/batch.py +++ b/src/koheesio/spark/writers/delta/batch.py @@ -323,7 +323,10 @@ def _validate_params(cls, params: dict) -> dict: clause = merge_conf.get("clause") if clause not in valid_clauses: raise ValueError(f"Invalid merge clause '{clause}' provided") - elif not isinstance(merge_builder, DeltaMergeBuilder): + elif ( + not isinstance(merge_builder, DeltaMergeBuilder) + or not type(merge_builder).__name__ == "DeltaMergeBuilder" + ): raise ValueError("merge_builder must be a list or merge clauses or a DeltaMergeBuilder instance") return params @@ -378,7 +381,7 @@ def execute(self) -> Writer.Output: if self.table.create_if_not_exists and not self.table.exists: _writer = _writer.options(**self.table.default_create_properties) - if isinstance(_writer, DeltaMergeBuilder): + if isinstance(_writer, DeltaMergeBuilder) or type(_writer).__name__ == "DeltaMergeBuilder": _writer.execute() else: if options := self.params: